Engine API
Connection and execution management. Use these helpers when you need lower-level control than the high-level Database and TableHandle APIs.
Database
Table access primitives.
This module provides the core database and table access functionality:
Database- Main database connection and query interfaceTableHandle- Lightweight reference to a database tableTransaction- Transaction context manager
The Database class is the primary entry point for all database operations,
including table creation, querying, and data mutations.
- class moltres.table.table.Database(config: MoltresConfig)[source]
Bases:
objectEntry-point object returned by
moltres.connect().The
Databaseclass provides the main interface for all database operations in Moltres. It handles connections, query execution, table management, and data mutations.The
Databaseclass supports context manager protocol for automatic connection cleanup. Use it in awithstatement to ensure the connection is properly closed.- config
The
MoltresConfiginstance used for this database
- dialect
The SQL dialect being used (e.g., “sqlite”, “postgresql”)
Example
Basic usage:
>>> from moltres import connect, col >>> db = connect("sqlite:///example.db") >>> df = db.table("users").select().where(col("active") == True) >>> results = df.collect() >>> db.close()
Using context manager (recommended):
>>> with connect("sqlite:///example.db") as db: ... df = db.table("users").select().where(col("active") == True) ... results = df.collect() ... # db.close() called automatically on exit
- batch() OperationBatch[source]
Create a batch context for grouping multiple operations.
All operations within the batch context are executed together in a single transaction when the context exits. If any exception occurs, all operations are rolled back.
- Returns:
OperationBatch context manager
Example
>>> with db.batch(): ... db.create_table("users", [...]) ... # All operations execute together on exit
- close() None[source]
Close all database connections and dispose of the engine.
This should be called when done with the database connection, especially for ephemeral test databases.
Note: After calling close(), the
Databaseinstance should not be used.
- compile_plan(plan: LogicalPlan) Select[source]
Compile a logical plan to a SQLAlchemy Select statement.
- property connection_manager: ConnectionManager
Get the connection manager for this database.
- Returns:
The connection manager instance
- Return type:
- createDataFrame(data: Sequence[dict[str, Any]] | Sequence[tuple] | Records | 'LazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) DataFrame[source]
Create a DataFrame from Python data.
Delegates to
EphemeralTableManager.Creates a temporary table, inserts the data, and returns a DataFrame querying from that table. If LazyRecords is provided, it will be auto-materialized. If pandas/polars DataFrame or LazyFrame is provided, it will be converted to Records with lazy conversion.
- Parameters:
data – Input data in one of supported formats: - List of dicts: [{“col1”: val1, “col2”: val2}, …] - List of tuples: Requires schema parameter with column names - Records object: Extracts data and schema if available - LazyRecords object: Auto-materializes and extracts data and schema - pandas DataFrame: Converts to Records with schema preservation - polars DataFrame: Converts to Records with schema preservation - polars LazyFrame: Materializes and converts to Records with schema preservation
schema – Optional explicit schema. If not provided, schema is inferred from data.
pk – Optional column name(s) to mark as primary key. Can be a single string or sequence of strings for composite keys.
auto_pk – Optional column name(s) to create as auto-incrementing primary key. Can specify same name as pk to make an existing column auto-incrementing.
- Returns:
DataFrame querying from the created temporary table
- Raises:
ValueError – If data is empty and no schema provided, or if primary key requirements are not met
ValidationError – If list of tuples provided without schema, or other validation errors
Example
>>> # Create DataFrame from list of dicts >>> df = db.createDataFrame([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], pk="id") >>> # Create DataFrame with auto-incrementing primary key >>> df = db.createDataFrame([{"name": "Alice"}, {"name": "Bob"}], auto_pk="id") >>> # Create DataFrame from Records >>> from moltres.io.records import Records >>> records = Records(_data=[{"id": 1, "name": "Alice"}], _database=db) >>> df = db.createDataFrame(records, pk="id") >>> # Create DataFrame from LazyRecords (auto-materializes) >>> lazy_records = db.read.records.csv("data.csv") >>> df = db.createDataFrame(lazy_records, pk="id") >>> # Create DataFrame from pandas DataFrame >>> import pandas as pd >>> pdf = pd.DataFrame([{"id": 1, "name": "Alice"}]) >>> df = db.createDataFrame(pdf, pk="id") >>> # Create DataFrame from polars DataFrame >>> import polars as pl >>> plf = pl.DataFrame([{"id": 1, "name": "Alice"}]) >>> df = db.createDataFrame(plf, pk="id")
- create_dataframe(data: Sequence[dict[str, object]] | Sequence[tuple] | Records | 'LazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) DataFrame[source]
Create a DataFrame from Python data (snake_case alias for createDataFrame).
This is an alias for
createDataFrame(). SeecreateDataFrame()for full documentation.- Parameters:
data – Input data in one of supported formats
schema – Optional explicit schema
pk – Optional column name(s) to mark as primary key
auto_pk – Optional column name(s) to create as auto-incrementing primary key
- Returns:
DataFrame querying from the created temporary table
- create_index(name: str, table: str, columns: str | Sequence[str], *, unique: bool = False, if_not_exists: bool = True) CreateIndexOperation[source]
Create a lazy create index operation.
Delegates to
DDLManager.- Parameters:
name – Name of the index to create
table – Name of the table to create the index on
columns – Column name(s) to index (single string or sequence)
unique – If True, create a UNIQUE index (default: False)
if_not_exists – If True, don’t error if index already exists (default: True)
- Returns:
CreateIndexOperation that executes on collect()
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT"), column("name", "TEXT"), column("age", "INTEGER")]).collect() >>> # Create single-column index >>> op = db.create_index("idx_email", "users", "email") >>> op.collect() # Executes the CREATE INDEX >>> # Multi-column index >>> op2 = db.create_index("idx_name_age", "users", ["name", "age"], unique=True) >>> op2.collect() >>> db.close()
- create_table(name: str, columns: Sequence[ColumnDef], *, if_not_exists: bool = True, temporary: bool = False, constraints: Sequence['UniqueConstraint' | 'CheckConstraint' | 'ForeignKeyConstraint'] | None = None) CreateTableOperation[source]
- create_table(model_class: Type['DeclarativeBase'], *, if_not_exists: bool = True, temporary: bool = False) CreateTableOperation
Create a lazy create table operation.
Delegates to
DDLManager.- Parameters:
name_or_model – Name of the table to create, or SQLAlchemy model class
columns – Sequence of ColumnDef objects defining the table schema (required if name_or_model is str)
if_not_exists – If True, don’t error if table already exists (default: True)
temporary – If True, create a temporary table (default: False)
constraints – Optional sequence of constraint objects (UniqueConstraint, CheckConstraint, ForeignKeyConstraint). Ignored if model_class is provided (constraints are extracted from model).
- Returns:
CreateTableOperation that executes on collect()
- Raises:
ValidationError – If table name or columns are invalid
ValueError – If model_class is not a valid SQLAlchemy model
Example
>>> from moltres import connect >>> from moltres.table.schema import column, unique, check >>> db = connect("sqlite:///:memory:") >>> # Create table with constraints >>> op = db.create_table( ... "users", ... [column("id", "INTEGER", primary_key=True), column("email", "TEXT")], ... constraints=[unique("email"), check("id > 0", name="ck_positive_id")] ... ) >>> table = op.collect() # Executes the CREATE TABLE >>> # Verify table was created >>> tables = db.get_table_names() >>> "users" in tables True >>> db.close()
- delete(table_name: str, *, where: Column) int[source]
Delete rows from a table.
Convenience method for deleting data from a table.
- Parameters:
table_name – Name of the table to delete from
where –
Columnexpression for the WHERE clause
- Returns:
Number of rows deleted
- Raises:
ValidationError – If table name is invalid
Example
>>> from moltres import connect, col >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], _database=db).insert_into("users") >>> # Delete rows >>> count = db.delete("users", where=col("id") == 1) >>> count 1 >>> # Verify deletion >>> df = db.table("users").select() >>> results = df.collect() >>> len(results) 1 >>> results[0]["name"] 'Bob' >>> db.close()
- property dialect: DialectSpec
- drop_index(name: str, table: str | None = None, *, if_exists: bool = True) DropIndexOperation[source]
Create a lazy drop index operation.
- Parameters:
name – Name of the index to drop
table – Optional table name (required for some dialects like MySQL)
if_exists – If True, don’t error if index doesn’t exist (default: True)
- Returns:
DropIndexOperation that executes on collect()
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT")]).collect() >>> db.create_index("idx_email", "users", "email").collect() >>> # Drop index >>> op = db.drop_index("idx_email", "users") >>> op.collect() # Executes the DROP INDEX >>> db.close()
- drop_table(name: str, *, if_exists: bool = True) DropTableOperation[source]
Create a lazy drop table operation.
Delegates to
DDLManager.- Parameters:
name – Name of the table to drop
if_exists – If True, don’t error if table doesn’t exist (default: True)
- Returns:
DropTableOperation that executes on collect()
Example
>>> op = db.drop_table("users") >>> op.collect() # Executes the DROP TABLE
- execute_plan(plan: LogicalPlan, model: Type[Any] | None = None) QueryResult[source]
Execute a logical plan and return results.
Delegates to
DatabaseQueryExecutor.
- execute_plan_stream(plan: LogicalPlan) Iterator[List[Dict[str, object]]][source]
Execute a plan and return an iterator of row chunks.
Delegates to
DatabaseQueryExecutor.
- execute_sql(sql: str, params: Dict[str, Any] | None = None) QueryResult[source]
Execute a raw SQL query.
Delegates to
DatabaseQueryExecutor.
- property executor: QueryExecutor
Get the query executor for this database.
- Returns:
The query executor instance
- Return type:
- explain(sql: str, params: Dict[str, Any] | None = None) str[source]
Get the execution plan for a SQL query.
Delegates to
DatabaseQueryExecutor.- Parameters:
sql – SQL query string
params – Optional query parameters
- Returns:
Execution plan as a string (dialect-specific)
Example
>>> plan = db.explain("SELECT * FROM users WHERE id = :id", params={"id": 1}) >>> print(plan)
- classmethod from_connection(connection: sqlalchemy.engine.Connection, **options: object) Database[source]
Create a
Databaseinstance from an existing SQLAlchemy Connection.This allows you to use Moltres with an existing SQLAlchemy Connection, enabling integration within existing transactions.
Note: The
Databasewill use the Connection’s engine, but will not manage the Connection’s lifecycle. The user is responsible for managing the connection.- Parameters:
connection – SQLAlchemy Connection instance
**options – Optional configuration parameters (same as from_engine)
- Returns:
Databaseinstance configured to use the Connection’s engine
Example
>>> from sqlalchemy import create_engine >>> from moltres import :class:`Database` >>> engine = create_engine("sqlite:///:memory:") >>> with engine.connect() as conn: ... db = :class:`Database`.from_connection(conn) ... # Use Moltres within the connection's transaction ... from moltres.table.schema import column ... _ = db.create_table("users", [column("id", "INTEGER")]).collect()
- classmethod from_engine(engine: Engine, **options: object) Database[source]
Create a
Databaseinstance from an existing SQLAlchemy Engine.This allows you to use Moltres with an existing SQLAlchemy Engine, enabling integration with existing SQLAlchemy projects.
- Parameters:
engine – SQLAlchemy Engine instance
**options – Optional configuration parameters: - echo: Enable SQLAlchemy echo mode - fetch_format: Result format - “records”, “pandas”, or “polars” - dialect: Override SQL dialect detection - query_timeout: Query execution timeout in seconds - Other options are stored in config.options
- Returns:
Database instance configured to use the provided Engine
- Return type:
Example
>>> from sqlalchemy import create_engine >>> from moltres import :class:`Database` >>> engine = create_engine("sqlite:///:memory:") >>> db = :class:`Database`.from_engine(engine) >>> # Now use Moltres with your existing engine >>> from moltres.table.schema import column >>> _ = db.create_table("users", [column("id", "INTEGER")]).collect()
- classmethod from_session(session: Session, **options: object) Database[source]
Create a
Databaseinstance from a SQLAlchemy ORM Session.This allows you to use Moltres with an existing SQLAlchemy ORM Session, enabling integration with ORM-based applications.
Note: The
Databasewill use the Session’s bind/engine, but will not manage the Session’s lifecycle. The user is responsible for managing the session.- Parameters:
session – SQLAlchemy ORM Session instance
**options – Optional configuration parameters (same as from_engine)
- Returns:
Databaseinstance configured to use the Session’s bind/engine
Example
>>> from sqlalchemy import create_engine >>> from sqlalchemy.orm import sessionmaker >>> from moltres import :class:`Database` >>> engine = create_engine("sqlite:///:memory:") >>> Session = sessionmaker(bind=engine) >>> with Session() as session: ... db = :class:`Database`.from_session(session) ... # Use Moltres with your existing session ... from moltres.table.schema import column ... _ = db.create_table("users", [column("id", "INTEGER")]).collect()
- get_columns(table_name: str) List['ColumnInfo'][source]
Get column information for a table.
- Parameters:
table_name – Name of the table to inspect
- Returns:
List of ColumnInfo objects with column metadata
- Raises:
ValidationError – If table name is invalid
ValueError – If database connection is not available
RuntimeError – If table does not exist or cannot be inspected
Example
>>> columns = db.get_columns("users") >>> # Returns: [ColumnInfo(name='id', type_name='INTEGER', ...), ...]
- get_table_names(schema: str | None = None) List[str][source]
Get list of table names in the database.
- Parameters:
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
- Returns:
List of table names
- Raises:
ValueError – If database connection is not available
RuntimeError – If inspection fails
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER")]).collect() >>> db.create_table("orders", [column("id", "INTEGER")]).collect() >>> # Get all table names >>> tables = db.get_table_names() >>> "users" in tables True >>> "orders" in tables True >>> db.close()
- get_transaction_status() dict[str, object] | None[source]
Get transaction status and metadata if a transaction is active.
- Returns:
readonly: Whether the transaction is read-only
isolation_level: Transaction isolation level (if set)
timeout: Transaction timeout in seconds (if set)
savepoints: List of active savepoint names
None if no transaction is active
- Return type:
Dictionary with transaction metadata including
Example
>>> with db.transaction(isolation_level="SERIALIZABLE", readonly=True) as txn: ... status = db.get_transaction_status() ... assert status["isolation_level"] == "SERIALIZABLE" ... assert status["readonly"] is True
- get_view_names(schema: str | None = None) List[str][source]
Get list of view names in the database.
- Parameters:
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
- Returns:
List of view names
- Raises:
ValueError – If database connection is not available
RuntimeError – If inspection fails
Example
>>> views = db.get_view_names() >>> # Returns: ['active_users_view', 'order_summary_view']
- insert(table_name: str, rows: Sequence[Mapping[str, object]] | 'Records' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame') int[source]
Insert rows into a table.
Delegates to
TableManager.- Parameters:
table_name – Name of the table to insert into
rows – Sequence of row dictionaries, Records, pandas DataFrame, polars DataFrame, or polars LazyFrame
- Returns:
Number of rows inserted
- Raises:
ValidationError – If table name is invalid or rows are empty
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> # Insert rows >>> count = db.insert("users", [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]) >>> count 2 >>> # Verify insertion >>> df = db.table("users").select() >>> results = df.collect() >>> len(results) 2 >>> results[0]["name"] 'Alice' >>> db.close()
- is_in_transaction() bool[source]
Check if currently in a transaction.
- Returns:
True if a transaction is active, False otherwise
Example
>>> if db.is_in_transaction(): ... status = db.get_transaction_status() ... print(f"Isolation: {status['isolation_level']}")
- property load: DataLoader
Return a DataLoader for loading data from files and tables as DataFrames.
Note: For SQL operations on tables, use db.table(name).select() instead.
- merge(table_name: str, rows: Sequence[Mapping[str, object]] | 'Records' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', *, on: Sequence[str], when_matched: Mapping[str, object] | None = None, when_not_matched: Mapping[str, object] | None = None) int[source]
Merge (upsert) rows into a table.
Convenience method for merging data into a table with conflict resolution.
- Parameters:
table_name – Name of the table to merge into
rows – Sequence of row dictionaries,
Records, pandasDataFrame, polarsDataFrame, or polars LazyFrameon – Sequence of column names that form the conflict key
when_matched – Optional dictionary of column updates when a conflict occurs
when_not_matched – Optional dictionary of default values when inserting new rows
- Returns:
Number of rows affected (inserted or updated)
- Raises:
ValidationError – If table name is invalid, rows are empty, or on columns are invalid
Example
>>> db.merge( ... "users", ... [{"id": 1, "name": "Alice", "email": "alice@example.com"}], ... on=["id"], ... when_matched={"name": "Alice Updated"} ... )
- property read: ReadAccessor
Return a ReadAccessor for accessing read operations.
Use db.read.records.* for
Records-based reads (backward compatibility). Use db.load.* forDataFrame-based reads (PySpark-style).
- reflect(schema: str | None = None, views: bool = False) Dict[str, 'TableSchema'][source]
Reflect entire database schema.
- Parameters:
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
views – If True, also reflect views (default: False)
- Returns:
Dictionary mapping table/view names to TableSchema objects
- Raises:
ValueError – If database connection is not available
RuntimeError – If reflection fails
Example
>>> schemas = db.reflect() >>> # Returns: {'users': TableSchema(...), 'orders': TableSchema(...)}
- reflect_table(name: str, schema: str | None = None) TableSchema[source]
Reflect a single table from the database.
- Parameters:
name – Name of the table to reflect
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
- Returns:
TableSchema object with table metadata
- Raises:
ValidationError – If table name is invalid
ValueError – If database connection is not available
RuntimeError – If table does not exist or reflection fails
Example
>>> schema = db.reflect_table("users") >>> # Returns: TableSchema(name='users', columns=[ColumnDef(...), ...])
- scan_csv(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a CSV file as a
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the CSV file
schema – Optional explicit schema
**options – Format-specific options (e.g., header=True, delimiter=”,”)
- Returns:
PolarsDataFramecontaining the CSV data (lazy)
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_csv("data.csv", header=True) >>> results = df.collect()
- scan_json(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a JSON file (array of objects) as a
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the JSON file
schema – Optional explicit schema
**options – Format-specific options (e.g., multiline=True)
- Returns:
PolarsDataFramecontaining the JSON data (lazy)
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_json("data.json") >>> results = df.collect()
- scan_jsonl(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a JSONL file (one JSON object per line) as a
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the JSONL file
schema – Optional explicit schema
**options – Format-specific options
- Returns:
PolarsDataFramecontaining the JSONL data (lazy)
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_jsonl("data.jsonl") >>> results = df.collect()
- scan_parquet(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a Parquet file as a
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the Parquet file
schema – Optional explicit schema
**options – Format-specific options
- Returns:
PolarsDataFramecontaining the Parquet data (lazy)- Raises:
RuntimeError – If pandas or pyarrow are not installed
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_parquet("data.parquet") >>> results = df.collect()
- scan_text(path: str, column_name: str = 'value', schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a text file as a single column
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the text file
column_name – Name of the column to create (default: “value”)
schema – Optional explicit schema
**options – Format-specific options
- Returns:
PolarsDataFramecontaining the text file lines (lazy)
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_text("data.txt", column_name="line") >>> results = df.collect()
- schema(table_name: str) List[ColumnDef][source]
Get the schema (column definitions) for a table.
- Parameters:
table_name – Name of the table
- Returns:
List of
ColumnDefobjects describing the table’s columns- Raises:
ValueError – If table does not exist
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> schema = db.schema("users") >>> len(schema) 2 >>> schema[0].name 'id' >>> schema[0].type_name 'INTEGER'
- show_schema(table_name: str) None[source]
Print a formatted schema for a table.
Convenience method for interactive exploration.
- Parameters:
table_name – Name of the table
Example
>>> db.show_schema("users") Schema for table 'users': - id: INTEGER (primary_key=True) - name: TEXT - email: TEXT
- show_tables(schema: str | None = None) None[source]
Print a formatted list of tables in the database.
Convenience method for interactive exploration.
- Parameters:
schema – Optional schema name
Example
>>> db.show_tables() Tables in database: - users - orders - products
- sql(sql: str, **params: object) DataFrame[source]
Execute a SQL query and return a
DataFrame.Similar to PySpark’s spark.sql(), this method accepts a raw SQL string and returns a lazy
DataFramethat can be chained with further operations. The SQL dialect is determined by the database connection.- Parameters:
sql – SQL query string to execute
**params – Optional named parameters for parameterized queries. Use :param_name syntax in SQL and pass values as kwargs.
- Returns:
Lazy
DataFramethat can be chained with further operations
Example
>>> from moltres import connect, col >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT"), column("age", "INTEGER")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice", "age": 25}, {"id": 2, "name": "Bob", "age": 17}], _database=db).insert_into("users") >>> # Basic SQL query >>> df = db.sql("SELECT * FROM users WHERE age > 18") >>> results = df.collect() >>> len(results) 1 >>> results[0]["name"] 'Alice' >>> # Parameterized query >>> df2 = db.sql("SELECT * FROM users WHERE id = :id", id=1) >>> results2 = df2.collect() >>> results2[0]["name"] 'Alice' >>> # Chaining operations >>> db.create_table("orders", [column("id", "INTEGER"), column("amount", "REAL")]).collect() >>> _ = :class:`Records`(_data=[{"id": 1, "amount": 150.0}, {"id": 2, "amount": 50.0}], _database=db).insert_into("orders") >>> df3 = db.sql("SELECT * FROM orders").where(col("amount") > 100).limit(1) >>> results3 = df3.collect() >>> len(results3) 1 >>> results3[0]["amount"] 150.0 >>> db.close()
- table(name: str) TableHandle[source]
- table(model_class: Type['DeclarativeBase']) TableHandle
Get a handle to a table in the database.
Delegates to
TableManager.- Parameters:
name_or_model – Name of the table, SQLAlchemy model class, or SQLModel model class
- Returns:
TableHandle for the specified table
- Raises:
ValidationError – If table name is invalid
ValueError – If model_class is not a valid SQLAlchemy or SQLModel model
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users") >>> # Get table handle by name >>> users = db.table("users") >>> df = users.select("id", "name") >>> results = df.collect() >>> results[0]["name"] 'Alice' >>> db.close()
- tables(schema: str | None = None) Dict[str, List[ColumnDef]][source]
Get all tables in the database with their schemas.
- Parameters:
schema – Optional schema name (for databases that support schemas)
- Returns:
Dictionary mapping table names to their column definitions
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER")]).collect() >>> db.create_table("orders", [column("id", "INTEGER")]).collect() >>> tables = db.tables() >>> "users" in tables True >>> "orders" in tables True >>> len(tables["users"]) 1
- transaction(savepoint: bool = False, readonly: bool = False, isolation_level: str | None = None, timeout: float | None = None) Iterator[Transaction][source]
Create a transaction context for grouping multiple operations.
All operations within the transaction context share the same transaction. If any exception occurs, the transaction is automatically rolled back. Otherwise, it is committed on successful exit.
- Parameters:
savepoint – If True and a transaction is already active, create a savepoint instead of raising an error. This enables nested transactions.
readonly – If True, set the transaction to read-only mode. Prevents writes.
isolation_level – Optional isolation level. Must be one of: - “READ UNCOMMITTED” - “READ COMMITTED” - “REPEATABLE READ” - “SERIALIZABLE”
timeout – Optional transaction timeout in seconds. Database-specific behavior: - PostgreSQL: Sets statement_timeout (milliseconds) - MySQL: Sets innodb_lock_wait_timeout (seconds) - SQLite: Not supported
- Yields:
Transactionobject that can be used for explicit commit/rollback
Example
Basic transaction:
>>> with db.transaction() as txn: ... df.write.insertInto("table") ... df.write.update("table", where=..., set={...}) ... # If any operation fails, all are rolled back ... # Otherwise, all are committed on exit
Nested transaction with savepoint:
>>> with db.transaction() as outer: ... # ... operations ... ... with db.transaction(savepoint=True) as inner: ... # ... operations that can be rolled back independently ... ... inner.savepoint("checkpoint") ... # ... operations ... ... inner.rollback_to_savepoint("checkpoint") ... # outer transaction continues...
Read-only transaction:
>>> with db.transaction(readonly=True) as txn: ... results = db.table("users").select().collect()
Transaction with isolation level:
>>> with db.transaction(isolation_level="SERIALIZABLE") as txn: ... # ... critical operations requiring highest isolation ...
- update(table_name: str, *, where: Column, set: Mapping[str, object]) int[source]
Update rows in a table.
Convenience method for updating data in a table.
- Parameters:
table_name – Name of the table to update
where –
Columnexpression for the WHERE clauseset – Dictionary of column names to new values
- Returns:
Number of rows updated
- Raises:
ValidationError – If table name is invalid or set dictionary is empty
Example
>>> from moltres import connect, col >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users") >>> # Update rows >>> count = db.update("users", where=col("id") == 1, set={"name": "Alice Updated"}) >>> count 1 >>> # Verify update >>> df = db.table("users").select() >>> results = df.collect() >>> results[0]["name"] 'Alice Updated' >>> db.close()
- class moltres.table.table.TableHandle(name: str, database: Database, model: Type[Any] | None = None)[source]
Bases:
objectLightweight handle representing a table reference.
A
TableHandleprovides access to a specific table in the database. It can be created from a table name or from a SQLModel/Pydantic model class.- model
Optional SQLModel, Pydantic, or SQLAlchemy model class
- Type:
Type[Any] | None
Example
>>> db = connect("sqlite:///:memory:") >>> handle = db.table("users") >>> df = handle.select()
- columns() List[str][source]
Get the list of column names for this table.
- Returns:
List of column names
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> handle = db.table("users") >>> cols = handle.columns() >>> "id" in cols True >>> "name" in cols True
- property model_class: Type['DeclarativeBase'] | None
Get the SQLAlchemy model class if this handle was created from a model.
- Returns:
SQLAlchemy model class or None if handle was created from table name
- pandas(*columns: str) PandasDataFrame[source]
Create a
PandasDataFramefrom this table.- Parameters:
*columns – Optional column names to select
- Returns:
PandasDataFramefor pandas-style operations- Return type:
PandasDataFrame
Example
>>> df = db.table('users').pandas() >>> df = db.table('users').pandas('id', 'name')
- polars(*columns: str) PolarsDataFrame[source]
Create a
PolarsDataFramefrom this table.- Parameters:
*columns – Optional column names to select
- Returns:
PolarsDataFramefor Polars-style operations- Return type:
PolarsDataFrame
Example
>>> df = db.table('users').polars() >>> df = db.table('users').polars('id', 'name')
- select(*columns: str) DataFrame[source]
Select columns from this table.
- Parameters:
*columns – Optional column names to select. If empty, selects all columns.
- Returns:
DataFrame with selected columns
- Return type:
DataFrame
Example
>>> db = connect("sqlite:///:memory:") >>> handle = db.table("users") >>> df = handle.select("id", "name")
- class moltres.table.table.Transaction(database: Database, connection: sqlalchemy.engine.Connection, readonly: bool = False, isolation_level: str | None = None, is_savepoint: bool = False)[source]
Bases:
objectTransactioncontext for grouping multiple operations.- commit() None[source]
Explicitly commit the transaction.
- Raises:
RuntimeError – If the transaction has already been committed or rolled back
- is_active() bool[source]
Check if this transaction is still active.
- Returns:
True if the transaction is active (not committed or rolled back), False otherwise
- is_readonly() bool[source]
Check if this transaction is read-only.
- Returns:
True if the transaction is read-only, False otherwise
- isolation_level() str | None[source]
Get the transaction isolation level.
- Returns:
Isolation level string or None if not set
- release_savepoint(name: str) None[source]
Release a savepoint.
- Parameters:
name – Savepoint name to release
- Raises:
RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist
- rollback() None[source]
Explicitly rollback the transaction.
- Raises:
RuntimeError – If the transaction has already been committed or rolled back
- rollback_to_savepoint(name: str) None[source]
Rollback to a specific savepoint.
- Parameters:
name – Savepoint name to rollback to
- Raises:
RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist
- savepoint(name: str | None = None) str[source]
Create a savepoint within this transaction.
- Parameters:
name – Optional savepoint name. If not provided, a unique name is generated.
- Returns:
The savepoint name (generated or provided)
- Raises:
RuntimeError – If the transaction has already been committed or rolled back
ValueError – If savepoints are not supported by the dialect
QueryExecutor
Query execution helpers (moltres facade over moltres_core.sql).
Translates moltres_core.exceptions into moltres.utils.exceptions so
framework integrations (FastAPI, etc.) keep matching registered handlers.
- class moltres.engine.execution.QueryExecutor(connection_manager: ConnectionManager, config: EngineConfig)[source]
Bases:
QueryExecutorSame as
moltres_core.sql.execution.QueryExecutorwith public Moltres exceptions.- execute(sql: str, params: Dict[str, Any] | None = None, transaction: Any = None) QueryResult[source]
Execute a non-SELECT SQL statement (INSERT, UPDATE, DELETE, etc.).
- Parameters:
sql – The SQL statement to execute
params – Optional parameter dictionary for parameterized queries
transaction – Optional transaction connection to use (if None, uses auto-commit)
- Returns:
QueryResult with rowcount of affected rows
- Raises:
ExecutionError – If SQL execution fails
- execute_many(sql: str, params_list: Sequence[Dict[str, Any]], transaction: Any = None) QueryResult[source]
Execute a SQL statement multiple times with different parameter sets.
This is more efficient than calling execute() in a loop for batch inserts.
- Parameters:
sql – The SQL statement to execute
params_list – Sequence of parameter dictionaries, one per execution
transaction – Optional transaction connection to use (if None, uses auto-commit)
- Returns:
QueryResult with total rowcount across all executions
- Raises:
ExecutionError – If SQL execution fails
- fetch(stmt: str | Any, params: Dict[str, Any] | None = None, connection: Any = None, model: Type[Any] | None = None) QueryResult[source]
Execute a SELECT query and return results.
- Parameters:
stmt – The SQLAlchemy Select statement or SQL string to execute
params – Optional parameter dictionary for parameterized queries (only used with SQL strings)
connection – Optional SQLAlchemy Connection to use. If provided, uses this connection directly instead of creating a new one. Useful for executing within existing transactions. The connection’s lifecycle is not managed by this method.
- Returns:
QueryResult containing rows and rowcount
- Raises:
ExecutionError – If SQL execution fails
- class moltres.engine.execution.QueryResult(rows: 'Optional[ResultRows]', rowcount: 'Optional[int]')[source]
Bases:
object
- moltres.engine.execution.register_performance_hook(event: str, callback: Callable[[str, float, dict[str, Any]], None]) None[source]
Register a performance monitoring hook.
- Parameters:
event – Event type - “query_start” or “query_end”
callback – Callback function that receives (sql, elapsed_time, metadata)
Example
>>> def log_slow_queries(sql: str, elapsed: float, metadata: dict): ... if elapsed > 1.0: ... print(f"Slow query ({elapsed:.2f}s): {sql[:100]}") >>> register_performance_hook("query_end", log_slow_queries)
ConnectionManager
Synchronous SQLAlchemy connection helpers (provided by moltres_core.sql).
- class moltres.engine.connection.ConnectionManager(config: EngineConfig)[source]
Bases:
objectCreates and caches SQLAlchemy engines for Moltres sessions.
- property active_transaction: sqlalchemy.engine.Connection | None
Get the active transaction connection if one exists.
- begin_transaction(savepoint: bool = False, readonly: bool = False, isolation_level: str | None = None, timeout: float | None = None) sqlalchemy.engine.Connection[source]
Begin a new transaction and return the connection.
- Parameters:
savepoint – If True and a transaction is already active, create a savepoint instead.
readonly – If True, set transaction to read-only mode.
isolation_level – Optional isolation level (READ UNCOMMITTED, READ COMMITTED, REPEATABLE READ, SERIALIZABLE).
timeout – Optional transaction timeout in seconds.
- Returns:
Connection that is part of a transaction (not auto-committed)
- Raises:
RuntimeError – If savepoint=False and a transaction is already active.
ValueError – If isolation level or readonly is requested but not supported by dialect.
- commit_transaction(connection: sqlalchemy.engine.Connection) None[source]
Commit a transaction.
- Parameters:
connection – The transaction connection to commit
- connect(transaction: sqlalchemy.engine.Connection | None = None) Iterator[sqlalchemy.engine.Connection][source]
Get a database connection.
- Parameters:
transaction – If provided, use this transaction connection instead of creating a new one. This allows operations to share a transaction. If None and an active transaction exists, uses the active transaction.
- Yields:
Databaseconnection
- create_savepoint(connection: sqlalchemy.engine.Connection, name: str) sqlalchemy.engine.Connection[source]
Create a savepoint in the current transaction.
- Parameters:
connection – The transaction connection
name – Savepoint name
- Returns:
The same connection (for compatibility)
- Raises:
RuntimeError – If no transaction is active or connection doesn’t match active transaction.
- property engine: sqlalchemy.engine.Engine
- release_savepoint(connection: sqlalchemy.engine.Connection, name: str) None[source]
Release a savepoint.
- Parameters:
connection – The transaction connection
name – Savepoint name to release
- Raises:
RuntimeError – If no transaction is active, connection doesn’t match, or savepoint not found.
- rollback_to_savepoint(connection: sqlalchemy.engine.Connection, name: str) None[source]
Rollback to a specific savepoint.
- Parameters:
connection – The transaction connection
name – Savepoint name to rollback to
- Raises:
RuntimeError – If no transaction is active, connection doesn’t match, or savepoint not found.
- rollback_transaction(connection: sqlalchemy.engine.Connection) None[source]
Rollback a transaction.
- Parameters:
connection – The transaction connection to rollback