Engine API

Connection and execution management. Use these helpers when you need lower-level control than the high-level Database and TableHandle APIs.

Database

Table access primitives.

This module provides the core database and table access functionality:

  • Database - Main database connection and query interface

  • TableHandle - Lightweight reference to a database table

  • Transaction - Transaction context manager

The Database class is the primary entry point for all database operations, including table creation, querying, and data mutations.

class moltres.table.table.Database(config: MoltresConfig)[source]

Bases: object

Entry-point object returned by moltres.connect().

The Database class provides the main interface for all database operations in Moltres. It handles connections, query execution, table management, and data mutations.

The Database class supports context manager protocol for automatic connection cleanup. Use it in a with statement to ensure the connection is properly closed.

config

The MoltresConfig instance used for this database

dialect

The SQL dialect being used (e.g., “sqlite”, “postgresql”)

Example

Basic usage:

>>> from moltres import connect, col
>>> db = connect("sqlite:///example.db")
>>> df = db.table("users").select().where(col("active") == True)
>>> results = df.collect()
>>> db.close()

Using context manager (recommended):

>>> with connect("sqlite:///example.db") as db:
...     df = db.table("users").select().where(col("active") == True)
...     results = df.collect()
...     # db.close() called automatically on exit
batch() OperationBatch[source]

Create a batch context for grouping multiple operations.

All operations within the batch context are executed together in a single transaction when the context exits. If any exception occurs, all operations are rolled back.

Returns:

OperationBatch context manager

Example

>>> with db.batch():
...     db.create_table("users", [...])
...     # All operations execute together on exit
close() None[source]

Close all database connections and dispose of the engine.

This should be called when done with the database connection, especially for ephemeral test databases.

Note: After calling close(), the Database instance should not be used.

compile_plan(plan: LogicalPlan) Select[source]

Compile a logical plan to a SQLAlchemy Select statement.

property connection_manager: ConnectionManager

Get the connection manager for this database.

Returns:

The connection manager instance

Return type:

ConnectionManager

createDataFrame(data: Sequence[dict[str, Any]] | Sequence[tuple] | Records | 'LazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) DataFrame[source]

Create a DataFrame from Python data.

Delegates to EphemeralTableManager.

Creates a temporary table, inserts the data, and returns a DataFrame querying from that table. If LazyRecords is provided, it will be auto-materialized. If pandas/polars DataFrame or LazyFrame is provided, it will be converted to Records with lazy conversion.

Parameters:
  • data – Input data in one of supported formats: - List of dicts: [{“col1”: val1, “col2”: val2}, …] - List of tuples: Requires schema parameter with column names - Records object: Extracts data and schema if available - LazyRecords object: Auto-materializes and extracts data and schema - pandas DataFrame: Converts to Records with schema preservation - polars DataFrame: Converts to Records with schema preservation - polars LazyFrame: Materializes and converts to Records with schema preservation

  • schema – Optional explicit schema. If not provided, schema is inferred from data.

  • pk – Optional column name(s) to mark as primary key. Can be a single string or sequence of strings for composite keys.

  • auto_pk – Optional column name(s) to create as auto-incrementing primary key. Can specify same name as pk to make an existing column auto-incrementing.

Returns:

DataFrame querying from the created temporary table

Raises:
  • ValueError – If data is empty and no schema provided, or if primary key requirements are not met

  • ValidationError – If list of tuples provided without schema, or other validation errors

Example

>>> # Create DataFrame from list of dicts
>>> df = db.createDataFrame([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], pk="id")
>>> # Create DataFrame with auto-incrementing primary key
>>> df = db.createDataFrame([{"name": "Alice"}, {"name": "Bob"}], auto_pk="id")
>>> # Create DataFrame from Records
>>> from moltres.io.records import Records
>>> records = Records(_data=[{"id": 1, "name": "Alice"}], _database=db)
>>> df = db.createDataFrame(records, pk="id")
>>> # Create DataFrame from LazyRecords (auto-materializes)
>>> lazy_records = db.read.records.csv("data.csv")
>>> df = db.createDataFrame(lazy_records, pk="id")
>>> # Create DataFrame from pandas DataFrame
>>> import pandas as pd
>>> pdf = pd.DataFrame([{"id": 1, "name": "Alice"}])
>>> df = db.createDataFrame(pdf, pk="id")
>>> # Create DataFrame from polars DataFrame
>>> import polars as pl
>>> plf = pl.DataFrame([{"id": 1, "name": "Alice"}])
>>> df = db.createDataFrame(plf, pk="id")
create_dataframe(data: Sequence[dict[str, object]] | Sequence[tuple] | Records | 'LazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) DataFrame[source]

Create a DataFrame from Python data (snake_case alias for createDataFrame).

This is an alias for createDataFrame(). See createDataFrame() for full documentation.

Parameters:
  • data – Input data in one of supported formats

  • schema – Optional explicit schema

  • pk – Optional column name(s) to mark as primary key

  • auto_pk – Optional column name(s) to create as auto-incrementing primary key

Returns:

DataFrame querying from the created temporary table

create_index(name: str, table: str, columns: str | Sequence[str], *, unique: bool = False, if_not_exists: bool = True) CreateIndexOperation[source]

Create a lazy create index operation.

Delegates to DDLManager.

Parameters:
  • name – Name of the index to create

  • table – Name of the table to create the index on

  • columns – Column name(s) to index (single string or sequence)

  • unique – If True, create a UNIQUE index (default: False)

  • if_not_exists – If True, don’t error if index already exists (default: True)

Returns:

CreateIndexOperation that executes on collect()

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT"), column("name", "TEXT"), column("age", "INTEGER")]).collect()
>>> # Create single-column index
>>> op = db.create_index("idx_email", "users", "email")
>>> op.collect()  # Executes the CREATE INDEX
>>> # Multi-column index
>>> op2 = db.create_index("idx_name_age", "users", ["name", "age"], unique=True)
>>> op2.collect()
>>> db.close()
create_table(name: str, columns: Sequence[ColumnDef], *, if_not_exists: bool = True, temporary: bool = False, constraints: Sequence['UniqueConstraint' | 'CheckConstraint' | 'ForeignKeyConstraint'] | None = None) CreateTableOperation[source]
create_table(model_class: Type['DeclarativeBase'], *, if_not_exists: bool = True, temporary: bool = False) CreateTableOperation

Create a lazy create table operation.

Delegates to DDLManager.

Parameters:
  • name_or_model – Name of the table to create, or SQLAlchemy model class

  • columns – Sequence of ColumnDef objects defining the table schema (required if name_or_model is str)

  • if_not_exists – If True, don’t error if table already exists (default: True)

  • temporary – If True, create a temporary table (default: False)

  • constraints – Optional sequence of constraint objects (UniqueConstraint, CheckConstraint, ForeignKeyConstraint). Ignored if model_class is provided (constraints are extracted from model).

Returns:

CreateTableOperation that executes on collect()

Raises:
  • ValidationError – If table name or columns are invalid

  • ValueError – If model_class is not a valid SQLAlchemy model

Example

>>> from moltres import connect
>>> from moltres.table.schema import column, unique, check
>>> db = connect("sqlite:///:memory:")
>>> # Create table with constraints
>>> op = db.create_table(
...     "users",
...     [column("id", "INTEGER", primary_key=True), column("email", "TEXT")],
...     constraints=[unique("email"), check("id > 0", name="ck_positive_id")]
... )
>>> table = op.collect()  # Executes the CREATE TABLE
>>> # Verify table was created
>>> tables = db.get_table_names()
>>> "users" in tables
True
>>> db.close()
delete(table_name: str, *, where: Column) int[source]

Delete rows from a table.

Convenience method for deleting data from a table.

Parameters:
  • table_name – Name of the table to delete from

  • whereColumn expression for the WHERE clause

Returns:

Number of rows deleted

Raises:

ValidationError – If table name is invalid

Example

>>> from moltres import connect, col
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> from moltres.io.records import :class:`Records`
>>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], _database=db).insert_into("users")
>>> # Delete rows
>>> count = db.delete("users", where=col("id") == 1)
>>> count
1
>>> # Verify deletion
>>> df = db.table("users").select()
>>> results = df.collect()
>>> len(results)
1
>>> results[0]["name"]
'Bob'
>>> db.close()
property dialect: DialectSpec
drop_index(name: str, table: str | None = None, *, if_exists: bool = True) DropIndexOperation[source]

Create a lazy drop index operation.

Parameters:
  • name – Name of the index to drop

  • table – Optional table name (required for some dialects like MySQL)

  • if_exists – If True, don’t error if index doesn’t exist (default: True)

Returns:

DropIndexOperation that executes on collect()

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT")]).collect()
>>> db.create_index("idx_email", "users", "email").collect()
>>> # Drop index
>>> op = db.drop_index("idx_email", "users")
>>> op.collect()  # Executes the DROP INDEX
>>> db.close()
drop_table(name: str, *, if_exists: bool = True) DropTableOperation[source]

Create a lazy drop table operation.

Delegates to DDLManager.

Parameters:
  • name – Name of the table to drop

  • if_exists – If True, don’t error if table doesn’t exist (default: True)

Returns:

DropTableOperation that executes on collect()

Example

>>> op = db.drop_table("users")
>>> op.collect()  # Executes the DROP TABLE
execute_plan(plan: LogicalPlan, model: Type[Any] | None = None) QueryResult[source]

Execute a logical plan and return results.

Delegates to DatabaseQueryExecutor.

execute_plan_stream(plan: LogicalPlan) Iterator[List[Dict[str, object]]][source]

Execute a plan and return an iterator of row chunks.

Delegates to DatabaseQueryExecutor.

execute_sql(sql: str, params: Dict[str, Any] | None = None) QueryResult[source]

Execute a raw SQL query.

Delegates to DatabaseQueryExecutor.

property executor: QueryExecutor

Get the query executor for this database.

Returns:

The query executor instance

Return type:

QueryExecutor

explain(sql: str, params: Dict[str, Any] | None = None) str[source]

Get the execution plan for a SQL query.

Delegates to DatabaseQueryExecutor.

Parameters:
  • sql – SQL query string

  • params – Optional query parameters

Returns:

Execution plan as a string (dialect-specific)

Example

>>> plan = db.explain("SELECT * FROM users WHERE id = :id", params={"id": 1})
>>> print(plan)
classmethod from_connection(connection: sqlalchemy.engine.Connection, **options: object) Database[source]

Create a Database instance from an existing SQLAlchemy Connection.

This allows you to use Moltres with an existing SQLAlchemy Connection, enabling integration within existing transactions.

Note: The Database will use the Connection’s engine, but will not manage the Connection’s lifecycle. The user is responsible for managing the connection.

Parameters:
  • connection – SQLAlchemy Connection instance

  • **options – Optional configuration parameters (same as from_engine)

Returns:

Database instance configured to use the Connection’s engine

Example

>>> from sqlalchemy import create_engine
>>> from moltres import :class:`Database`
>>> engine = create_engine("sqlite:///:memory:")
>>> with engine.connect() as conn:
...     db = :class:`Database`.from_connection(conn)
...     # Use Moltres within the connection's transaction
...     from moltres.table.schema import column
...     _ = db.create_table("users", [column("id", "INTEGER")]).collect()
classmethod from_engine(engine: Engine, **options: object) Database[source]

Create a Database instance from an existing SQLAlchemy Engine.

This allows you to use Moltres with an existing SQLAlchemy Engine, enabling integration with existing SQLAlchemy projects.

Parameters:
  • engine – SQLAlchemy Engine instance

  • **options – Optional configuration parameters: - echo: Enable SQLAlchemy echo mode - fetch_format: Result format - “records”, “pandas”, or “polars” - dialect: Override SQL dialect detection - query_timeout: Query execution timeout in seconds - Other options are stored in config.options

Returns:

Database instance configured to use the provided Engine

Return type:

Database

Example

>>> from sqlalchemy import create_engine
>>> from moltres import :class:`Database`
>>> engine = create_engine("sqlite:///:memory:")
>>> db = :class:`Database`.from_engine(engine)
>>> # Now use Moltres with your existing engine
>>> from moltres.table.schema import column
>>> _ = db.create_table("users", [column("id", "INTEGER")]).collect()
classmethod from_session(session: Session, **options: object) Database[source]

Create a Database instance from a SQLAlchemy ORM Session.

This allows you to use Moltres with an existing SQLAlchemy ORM Session, enabling integration with ORM-based applications.

Note: The Database will use the Session’s bind/engine, but will not manage the Session’s lifecycle. The user is responsible for managing the session.

Parameters:
  • session – SQLAlchemy ORM Session instance

  • **options – Optional configuration parameters (same as from_engine)

Returns:

Database instance configured to use the Session’s bind/engine

Example

>>> from sqlalchemy import create_engine
>>> from sqlalchemy.orm import sessionmaker
>>> from moltres import :class:`Database`
>>> engine = create_engine("sqlite:///:memory:")
>>> Session = sessionmaker(bind=engine)
>>> with Session() as session:
...     db = :class:`Database`.from_session(session)
...     # Use Moltres with your existing session
...     from moltres.table.schema import column
...     _ = db.create_table("users", [column("id", "INTEGER")]).collect()
get_columns(table_name: str) List['ColumnInfo'][source]

Get column information for a table.

Parameters:

table_name – Name of the table to inspect

Returns:

List of ColumnInfo objects with column metadata

Raises:
  • ValidationError – If table name is invalid

  • ValueError – If database connection is not available

  • RuntimeError – If table does not exist or cannot be inspected

Example

>>> columns = db.get_columns("users")
>>> # Returns: [ColumnInfo(name='id', type_name='INTEGER', ...), ...]
get_table_names(schema: str | None = None) List[str][source]

Get list of table names in the database.

Parameters:

schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

Returns:

List of table names

Raises:

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER")]).collect()
>>> db.create_table("orders", [column("id", "INTEGER")]).collect()
>>> # Get all table names
>>> tables = db.get_table_names()
>>> "users" in tables
True
>>> "orders" in tables
True
>>> db.close()
get_transaction_status() dict[str, object] | None[source]

Get transaction status and metadata if a transaction is active.

Returns:

  • readonly: Whether the transaction is read-only

  • isolation_level: Transaction isolation level (if set)

  • timeout: Transaction timeout in seconds (if set)

  • savepoints: List of active savepoint names

None if no transaction is active

Return type:

Dictionary with transaction metadata including

Example

>>> with db.transaction(isolation_level="SERIALIZABLE", readonly=True) as txn:
...     status = db.get_transaction_status()
...     assert status["isolation_level"] == "SERIALIZABLE"
...     assert status["readonly"] is True
get_view_names(schema: str | None = None) List[str][source]

Get list of view names in the database.

Parameters:

schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

Returns:

List of view names

Raises:

Example

>>> views = db.get_view_names()
>>> # Returns: ['active_users_view', 'order_summary_view']
insert(table_name: str, rows: Sequence[Mapping[str, object]] | 'Records' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame') int[source]

Insert rows into a table.

Delegates to TableManager.

Parameters:
  • table_name – Name of the table to insert into

  • rows – Sequence of row dictionaries, Records, pandas DataFrame, polars DataFrame, or polars LazyFrame

Returns:

Number of rows inserted

Raises:

ValidationError – If table name is invalid or rows are empty

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> # Insert rows
>>> count = db.insert("users", [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}])
>>> count
2
>>> # Verify insertion
>>> df = db.table("users").select()
>>> results = df.collect()
>>> len(results)
2
>>> results[0]["name"]
'Alice'
>>> db.close()
is_in_transaction() bool[source]

Check if currently in a transaction.

Returns:

True if a transaction is active, False otherwise

Example

>>> if db.is_in_transaction():
...     status = db.get_transaction_status()
...     print(f"Isolation: {status['isolation_level']}")
property load: DataLoader

Return a DataLoader for loading data from files and tables as DataFrames.

Note: For SQL operations on tables, use db.table(name).select() instead.

merge(table_name: str, rows: Sequence[Mapping[str, object]] | 'Records' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', *, on: Sequence[str], when_matched: Mapping[str, object] | None = None, when_not_matched: Mapping[str, object] | None = None) int[source]

Merge (upsert) rows into a table.

Convenience method for merging data into a table with conflict resolution.

Parameters:
  • table_name – Name of the table to merge into

  • rows – Sequence of row dictionaries, Records, pandas DataFrame, polars DataFrame, or polars LazyFrame

  • on – Sequence of column names that form the conflict key

  • when_matched – Optional dictionary of column updates when a conflict occurs

  • when_not_matched – Optional dictionary of default values when inserting new rows

Returns:

Number of rows affected (inserted or updated)

Raises:

ValidationError – If table name is invalid, rows are empty, or on columns are invalid

Example

>>> db.merge(
...     "users",
...     [{"id": 1, "name": "Alice", "email": "alice@example.com"}],
...     on=["id"],
...     when_matched={"name": "Alice Updated"}
... )
property read: ReadAccessor

Return a ReadAccessor for accessing read operations.

Use db.read.records.* for Records-based reads (backward compatibility). Use db.load.* for DataFrame-based reads (PySpark-style).

reflect(schema: str | None = None, views: bool = False) Dict[str, 'TableSchema'][source]

Reflect entire database schema.

Parameters:
  • schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

  • views – If True, also reflect views (default: False)

Returns:

Dictionary mapping table/view names to TableSchema objects

Raises:

Example

>>> schemas = db.reflect()
>>> # Returns: {'users': TableSchema(...), 'orders': TableSchema(...)}
reflect_table(name: str, schema: str | None = None) TableSchema[source]

Reflect a single table from the database.

Parameters:
  • name – Name of the table to reflect

  • schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

Returns:

TableSchema object with table metadata

Raises:
  • ValidationError – If table name is invalid

  • ValueError – If database connection is not available

  • RuntimeError – If table does not exist or reflection fails

Example

>>> schema = db.reflect_table("users")
>>> # Returns: TableSchema(name='users', columns=[ColumnDef(...), ...])
scan_csv(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a CSV file as a PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the CSV file

  • schema – Optional explicit schema

  • **options – Format-specific options (e.g., header=True, delimiter=”,”)

Returns:

PolarsDataFrame containing the CSV data (lazy)

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_csv("data.csv", header=True)
>>> results = df.collect()
scan_json(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a JSON file (array of objects) as a PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the JSON file

  • schema – Optional explicit schema

  • **options – Format-specific options (e.g., multiline=True)

Returns:

PolarsDataFrame containing the JSON data (lazy)

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_json("data.json")
>>> results = df.collect()
scan_jsonl(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a JSONL file (one JSON object per line) as a PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the JSONL file

  • schema – Optional explicit schema

  • **options – Format-specific options

Returns:

PolarsDataFrame containing the JSONL data (lazy)

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_jsonl("data.jsonl")
>>> results = df.collect()
scan_parquet(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a Parquet file as a PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the Parquet file

  • schema – Optional explicit schema

  • **options – Format-specific options

Returns:

PolarsDataFrame containing the Parquet data (lazy)

Raises:

RuntimeError – If pandas or pyarrow are not installed

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_parquet("data.parquet")
>>> results = df.collect()
scan_text(path: str, column_name: str = 'value', schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a text file as a single column PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the text file

  • column_name – Name of the column to create (default: “value”)

  • schema – Optional explicit schema

  • **options – Format-specific options

Returns:

PolarsDataFrame containing the text file lines (lazy)

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_text("data.txt", column_name="line")
>>> results = df.collect()
schema(table_name: str) List[ColumnDef][source]

Get the schema (column definitions) for a table.

Parameters:

table_name – Name of the table

Returns:

List of ColumnDef objects describing the table’s columns

Raises:

ValueError – If table does not exist

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> schema = db.schema("users")
>>> len(schema)
2
>>> schema[0].name
'id'
>>> schema[0].type_name
'INTEGER'
show_schema(table_name: str) None[source]

Print a formatted schema for a table.

Convenience method for interactive exploration.

Parameters:

table_name – Name of the table

Example

>>> db.show_schema("users")
Schema for table 'users':
- id: INTEGER (primary_key=True)
- name: TEXT
- email: TEXT
show_tables(schema: str | None = None) None[source]

Print a formatted list of tables in the database.

Convenience method for interactive exploration.

Parameters:

schema – Optional schema name

Example

>>> db.show_tables()
Tables in database:
- users
- orders
- products
sql(sql: str, **params: object) DataFrame[source]

Execute a SQL query and return a DataFrame.

Similar to PySpark’s spark.sql(), this method accepts a raw SQL string and returns a lazy DataFrame that can be chained with further operations. The SQL dialect is determined by the database connection.

Parameters:
  • sql – SQL query string to execute

  • **params – Optional named parameters for parameterized queries. Use :param_name syntax in SQL and pass values as kwargs.

Returns:

Lazy DataFrame that can be chained with further operations

Example

>>> from moltres import connect, col
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT"), column("age", "INTEGER")]).collect()
>>> from moltres.io.records import :class:`Records`
>>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice", "age": 25}, {"id": 2, "name": "Bob", "age": 17}], _database=db).insert_into("users")
>>> # Basic SQL query
>>> df = db.sql("SELECT * FROM users WHERE age > 18")
>>> results = df.collect()
>>> len(results)
1
>>> results[0]["name"]
'Alice'
>>> # Parameterized query
>>> df2 = db.sql("SELECT * FROM users WHERE id = :id", id=1)
>>> results2 = df2.collect()
>>> results2[0]["name"]
'Alice'
>>> # Chaining operations
>>> db.create_table("orders", [column("id", "INTEGER"), column("amount", "REAL")]).collect()
>>> _ = :class:`Records`(_data=[{"id": 1, "amount": 150.0}, {"id": 2, "amount": 50.0}], _database=db).insert_into("orders")
>>> df3 = db.sql("SELECT * FROM orders").where(col("amount") > 100).limit(1)
>>> results3 = df3.collect()
>>> len(results3)
1
>>> results3[0]["amount"]
150.0
>>> db.close()
table(name: str) TableHandle[source]
table(model_class: Type['DeclarativeBase']) TableHandle

Get a handle to a table in the database.

Delegates to TableManager.

Parameters:

name_or_model – Name of the table, SQLAlchemy model class, or SQLModel model class

Returns:

TableHandle for the specified table

Raises:
  • ValidationError – If table name is invalid

  • ValueError – If model_class is not a valid SQLAlchemy or SQLModel model

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> _ = db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> from moltres.io.records import :class:`Records`
>>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users")
>>> # Get table handle by name
>>> users = db.table("users")
>>> df = users.select("id", "name")
>>> results = df.collect()
>>> results[0]["name"]
'Alice'
>>> db.close()
tables(schema: str | None = None) Dict[str, List[ColumnDef]][source]

Get all tables in the database with their schemas.

Parameters:

schema – Optional schema name (for databases that support schemas)

Returns:

Dictionary mapping table names to their column definitions

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER")]).collect()
>>> db.create_table("orders", [column("id", "INTEGER")]).collect()
>>> tables = db.tables()
>>> "users" in tables
True
>>> "orders" in tables
True
>>> len(tables["users"])
1
transaction(savepoint: bool = False, readonly: bool = False, isolation_level: str | None = None, timeout: float | None = None) Iterator[Transaction][source]

Create a transaction context for grouping multiple operations.

All operations within the transaction context share the same transaction. If any exception occurs, the transaction is automatically rolled back. Otherwise, it is committed on successful exit.

Parameters:
  • savepoint – If True and a transaction is already active, create a savepoint instead of raising an error. This enables nested transactions.

  • readonly – If True, set the transaction to read-only mode. Prevents writes.

  • isolation_level – Optional isolation level. Must be one of: - “READ UNCOMMITTED” - “READ COMMITTED” - “REPEATABLE READ” - “SERIALIZABLE”

  • timeout – Optional transaction timeout in seconds. Database-specific behavior: - PostgreSQL: Sets statement_timeout (milliseconds) - MySQL: Sets innodb_lock_wait_timeout (seconds) - SQLite: Not supported

Yields:

Transaction object that can be used for explicit commit/rollback

Example

Basic transaction:

>>> with db.transaction() as txn:
...     df.write.insertInto("table")
...     df.write.update("table", where=..., set={...})
...     # If any operation fails, all are rolled back
...     # Otherwise, all are committed on exit

Nested transaction with savepoint:

>>> with db.transaction() as outer:
...     # ... operations ...
...     with db.transaction(savepoint=True) as inner:
...         # ... operations that can be rolled back independently ...
...         inner.savepoint("checkpoint")
...         # ... operations ...
...         inner.rollback_to_savepoint("checkpoint")
...     # outer transaction continues...

Read-only transaction:

>>> with db.transaction(readonly=True) as txn:
...     results = db.table("users").select().collect()

Transaction with isolation level:

>>> with db.transaction(isolation_level="SERIALIZABLE") as txn:
...     # ... critical operations requiring highest isolation ...
update(table_name: str, *, where: Column, set: Mapping[str, object]) int[source]

Update rows in a table.

Convenience method for updating data in a table.

Parameters:
  • table_name – Name of the table to update

  • whereColumn expression for the WHERE clause

  • set – Dictionary of column names to new values

Returns:

Number of rows updated

Raises:

ValidationError – If table name is invalid or set dictionary is empty

Example

>>> from moltres import connect, col
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> from moltres.io.records import :class:`Records`
>>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users")
>>> # Update rows
>>> count = db.update("users", where=col("id") == 1, set={"name": "Alice Updated"})
>>> count
1
>>> # Verify update
>>> df = db.table("users").select()
>>> results = df.collect()
>>> results[0]["name"]
'Alice Updated'
>>> db.close()
class moltres.table.table.TableHandle(name: str, database: Database, model: Type[Any] | None = None)[source]

Bases: object

Lightweight handle representing a table reference.

A TableHandle provides access to a specific table in the database. It can be created from a table name or from a SQLModel/Pydantic model class.

name

Name of the table

Type:

str

database

The Database instance this handle belongs to

Type:

moltres.table.table.Database

model

Optional SQLModel, Pydantic, or SQLAlchemy model class

Type:

Type[Any] | None

Example

>>> db = connect("sqlite:///:memory:")
>>> handle = db.table("users")
>>> df = handle.select()
columns() List[str][source]

Get the list of column names for this table.

Returns:

List of column names

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> handle = db.table("users")
>>> cols = handle.columns()
>>> "id" in cols
True
>>> "name" in cols
True
database: Database
model: Type[Any] | None = None
property model_class: Type['DeclarativeBase'] | None

Get the SQLAlchemy model class if this handle was created from a model.

Returns:

SQLAlchemy model class or None if handle was created from table name

name: str
pandas(*columns: str) PandasDataFrame[source]

Create a PandasDataFrame from this table.

Parameters:

*columns – Optional column names to select

Returns:

PandasDataFrame for pandas-style operations

Return type:

PandasDataFrame

Example

>>> df = db.table('users').pandas()
>>> df = db.table('users').pandas('id', 'name')
polars(*columns: str) PolarsDataFrame[source]

Create a PolarsDataFrame from this table.

Parameters:

*columns – Optional column names to select

Returns:

PolarsDataFrame for Polars-style operations

Return type:

PolarsDataFrame

Example

>>> df = db.table('users').polars()
>>> df = db.table('users').polars('id', 'name')
select(*columns: str) DataFrame[source]

Select columns from this table.

Parameters:

*columns – Optional column names to select. If empty, selects all columns.

Returns:

DataFrame with selected columns

Return type:

DataFrame

Example

>>> db = connect("sqlite:///:memory:")
>>> handle = db.table("users")
>>> df = handle.select("id", "name")
class moltres.table.table.Transaction(database: Database, connection: sqlalchemy.engine.Connection, readonly: bool = False, isolation_level: str | None = None, is_savepoint: bool = False)[source]

Bases: object

Transaction context for grouping multiple operations.

commit() None[source]

Explicitly commit the transaction.

Raises:

RuntimeError – If the transaction has already been committed or rolled back

is_active() bool[source]

Check if this transaction is still active.

Returns:

True if the transaction is active (not committed or rolled back), False otherwise

is_readonly() bool[source]

Check if this transaction is read-only.

Returns:

True if the transaction is read-only, False otherwise

isolation_level() str | None[source]

Get the transaction isolation level.

Returns:

Isolation level string or None if not set

release_savepoint(name: str) None[source]

Release a savepoint.

Parameters:

name – Savepoint name to release

Raises:

RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist

rollback() None[source]

Explicitly rollback the transaction.

Raises:

RuntimeError – If the transaction has already been committed or rolled back

rollback_to_savepoint(name: str) None[source]

Rollback to a specific savepoint.

Parameters:

name – Savepoint name to rollback to

Raises:

RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist

savepoint(name: str | None = None) str[source]

Create a savepoint within this transaction.

Parameters:

name – Optional savepoint name. If not provided, a unique name is generated.

Returns:

The savepoint name (generated or provided)

Raises:
  • RuntimeError – If the transaction has already been committed or rolled back

  • ValueError – If savepoints are not supported by the dialect

QueryExecutor

Query execution helpers (moltres facade over moltres_core.sql).

Translates moltres_core.exceptions into moltres.utils.exceptions so framework integrations (FastAPI, etc.) keep matching registered handlers.

class moltres.engine.execution.QueryExecutor(connection_manager: ConnectionManager, config: EngineConfig)[source]

Bases: QueryExecutor

Same as moltres_core.sql.execution.QueryExecutor with public Moltres exceptions.

execute(sql: str, params: Dict[str, Any] | None = None, transaction: Any = None) QueryResult[source]

Execute a non-SELECT SQL statement (INSERT, UPDATE, DELETE, etc.).

Parameters:
  • sql – The SQL statement to execute

  • params – Optional parameter dictionary for parameterized queries

  • transaction – Optional transaction connection to use (if None, uses auto-commit)

Returns:

QueryResult with rowcount of affected rows

Raises:

ExecutionError – If SQL execution fails

execute_many(sql: str, params_list: Sequence[Dict[str, Any]], transaction: Any = None) QueryResult[source]

Execute a SQL statement multiple times with different parameter sets.

This is more efficient than calling execute() in a loop for batch inserts.

Parameters:
  • sql – The SQL statement to execute

  • params_list – Sequence of parameter dictionaries, one per execution

  • transaction – Optional transaction connection to use (if None, uses auto-commit)

Returns:

QueryResult with total rowcount across all executions

Raises:

ExecutionError – If SQL execution fails

fetch(stmt: str | Any, params: Dict[str, Any] | None = None, connection: Any = None, model: Type[Any] | None = None) QueryResult[source]

Execute a SELECT query and return results.

Parameters:
  • stmt – The SQLAlchemy Select statement or SQL string to execute

  • params – Optional parameter dictionary for parameterized queries (only used with SQL strings)

  • connection – Optional SQLAlchemy Connection to use. If provided, uses this connection directly instead of creating a new one. Useful for executing within existing transactions. The connection’s lifecycle is not managed by this method.

Returns:

QueryResult containing rows and rowcount

Raises:

ExecutionError – If SQL execution fails

class moltres.engine.execution.QueryResult(rows: 'Optional[ResultRows]', rowcount: 'Optional[int]')[source]

Bases: object

rowcount: int | None
rows: Any | None
moltres.engine.execution.register_performance_hook(event: str, callback: Callable[[str, float, dict[str, Any]], None]) None[source]

Register a performance monitoring hook.

Parameters:
  • event – Event type - “query_start” or “query_end”

  • callback – Callback function that receives (sql, elapsed_time, metadata)

Example

>>> def log_slow_queries(sql: str, elapsed: float, metadata: dict):
...     if elapsed > 1.0:
...         print(f"Slow query ({elapsed:.2f}s): {sql[:100]}")
>>> register_performance_hook("query_end", log_slow_queries)
moltres.engine.execution.unregister_performance_hook(event: str, callback: Callable[[str, float, dict[str, Any]], None]) None[source]

Unregister a performance monitoring hook.

Parameters:
  • event – Event type - “query_start” or “query_end”

  • callback – Callback function to remove

ConnectionManager

Synchronous SQLAlchemy connection helpers (provided by moltres_core.sql).

class moltres.engine.connection.ConnectionManager(config: EngineConfig)[source]

Bases: object

Creates and caches SQLAlchemy engines for Moltres sessions.

property active_transaction: sqlalchemy.engine.Connection | None

Get the active transaction connection if one exists.

begin_transaction(savepoint: bool = False, readonly: bool = False, isolation_level: str | None = None, timeout: float | None = None) sqlalchemy.engine.Connection[source]

Begin a new transaction and return the connection.

Parameters:
  • savepoint – If True and a transaction is already active, create a savepoint instead.

  • readonly – If True, set transaction to read-only mode.

  • isolation_level – Optional isolation level (READ UNCOMMITTED, READ COMMITTED, REPEATABLE READ, SERIALIZABLE).

  • timeout – Optional transaction timeout in seconds.

Returns:

Connection that is part of a transaction (not auto-committed)

Raises:
  • RuntimeError – If savepoint=False and a transaction is already active.

  • ValueError – If isolation level or readonly is requested but not supported by dialect.

commit_transaction(connection: sqlalchemy.engine.Connection) None[source]

Commit a transaction.

Parameters:

connection – The transaction connection to commit

connect(transaction: sqlalchemy.engine.Connection | None = None) Iterator[sqlalchemy.engine.Connection][source]

Get a database connection.

Parameters:

transaction – If provided, use this transaction connection instead of creating a new one. This allows operations to share a transaction. If None and an active transaction exists, uses the active transaction.

Yields:

Database connection

create_savepoint(connection: sqlalchemy.engine.Connection, name: str) sqlalchemy.engine.Connection[source]

Create a savepoint in the current transaction.

Parameters:
  • connection – The transaction connection

  • name – Savepoint name

Returns:

The same connection (for compatibility)

Raises:

RuntimeError – If no transaction is active or connection doesn’t match active transaction.

property engine: sqlalchemy.engine.Engine
release_savepoint(connection: sqlalchemy.engine.Connection, name: str) None[source]

Release a savepoint.

Parameters:
  • connection – The transaction connection

  • name – Savepoint name to release

Raises:

RuntimeError – If no transaction is active, connection doesn’t match, or savepoint not found.

rollback_to_savepoint(connection: sqlalchemy.engine.Connection, name: str) None[source]

Rollback to a specific savepoint.

Parameters:
  • connection – The transaction connection

  • name – Savepoint name to rollback to

Raises:

RuntimeError – If no transaction is active, connection doesn’t match, or savepoint not found.

rollback_transaction(connection: sqlalchemy.engine.Connection) None[source]

Rollback a transaction.

Parameters:

connection – The transaction connection to rollback

property savepoint_stack: list[str]

Get the current savepoint stack.

property transaction_metadata: dict[str, object] | None

Get transaction metadata if a transaction is active.