Table API

Table handles are the entry point for CRUD operations and schema management. They provide eager helpers for inserts, updates, deletes, and DDL.

TableHandle

Table access primitives.

This module provides the core database and table access functionality:

  • Database - Main database connection and query interface

  • TableHandle - Lightweight reference to a database table

  • Transaction - Transaction context manager

The Database class is the primary entry point for all database operations, including table creation, querying, and data mutations.

class moltres.table.table.Database(config: MoltresConfig)[source]

Bases: object

Entry-point object returned by moltres.connect().

The Database class provides the main interface for all database operations in Moltres. It handles connections, query execution, table management, and data mutations.

The Database class supports context manager protocol for automatic connection cleanup. Use it in a with statement to ensure the connection is properly closed.

config

The MoltresConfig instance used for this database

dialect

The SQL dialect being used (e.g., “sqlite”, “postgresql”)

Example

Basic usage:

>>> from moltres import connect, col
>>> db = connect("sqlite:///example.db")
>>> df = db.table("users").select().where(col("active") == True)
>>> results = df.collect()
>>> db.close()

Using context manager (recommended):

>>> with connect("sqlite:///example.db") as db:
...     df = db.table("users").select().where(col("active") == True)
...     results = df.collect()
...     # db.close() called automatically on exit
batch() OperationBatch[source]

Create a batch context for grouping multiple operations.

All operations within the batch context are executed together in a single transaction when the context exits. If any exception occurs, all operations are rolled back.

Returns:

OperationBatch context manager

Example

>>> with db.batch():
...     db.create_table("users", [...])
...     # All operations execute together on exit
close() None[source]

Close all database connections and dispose of the engine.

This should be called when done with the database connection, especially for ephemeral test databases.

Note: After calling close(), the Database instance should not be used.

compile_plan(plan: LogicalPlan) Select[source]

Compile a logical plan to a SQLAlchemy Select statement.

property connection_manager: ConnectionManager

Get the connection manager for this database.

Returns:

The connection manager instance

Return type:

ConnectionManager

createDataFrame(data: Sequence[dict[str, Any]] | Sequence[tuple] | Records | 'LazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) DataFrame[source]

Create a DataFrame from Python data.

Delegates to EphemeralTableManager.

Creates a temporary table, inserts the data, and returns a DataFrame querying from that table. If LazyRecords is provided, it will be auto-materialized. If pandas/polars DataFrame or LazyFrame is provided, it will be converted to Records with lazy conversion.

Parameters:
  • data – Input data in one of supported formats: - List of dicts: [{“col1”: val1, “col2”: val2}, …] - List of tuples: Requires schema parameter with column names - Records object: Extracts data and schema if available - LazyRecords object: Auto-materializes and extracts data and schema - pandas DataFrame: Converts to Records with schema preservation - polars DataFrame: Converts to Records with schema preservation - polars LazyFrame: Materializes and converts to Records with schema preservation

  • schema – Optional explicit schema. If not provided, schema is inferred from data.

  • pk – Optional column name(s) to mark as primary key. Can be a single string or sequence of strings for composite keys.

  • auto_pk – Optional column name(s) to create as auto-incrementing primary key. Can specify same name as pk to make an existing column auto-incrementing.

Returns:

DataFrame querying from the created temporary table

Raises:
  • ValueError – If data is empty and no schema provided, or if primary key requirements are not met

  • ValidationError – If list of tuples provided without schema, or other validation errors

Example

>>> # Create DataFrame from list of dicts
>>> df = db.createDataFrame([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], pk="id")
>>> # Create DataFrame with auto-incrementing primary key
>>> df = db.createDataFrame([{"name": "Alice"}, {"name": "Bob"}], auto_pk="id")
>>> # Create DataFrame from Records
>>> from moltres.io.records import Records
>>> records = Records(_data=[{"id": 1, "name": "Alice"}], _database=db)
>>> df = db.createDataFrame(records, pk="id")
>>> # Create DataFrame from LazyRecords (auto-materializes)
>>> lazy_records = db.read.records.csv("data.csv")
>>> df = db.createDataFrame(lazy_records, pk="id")
>>> # Create DataFrame from pandas DataFrame
>>> import pandas as pd
>>> pdf = pd.DataFrame([{"id": 1, "name": "Alice"}])
>>> df = db.createDataFrame(pdf, pk="id")
>>> # Create DataFrame from polars DataFrame
>>> import polars as pl
>>> plf = pl.DataFrame([{"id": 1, "name": "Alice"}])
>>> df = db.createDataFrame(plf, pk="id")
create_dataframe(data: Sequence[dict[str, object]] | Sequence[tuple] | Records | 'LazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) DataFrame[source]

Create a DataFrame from Python data (snake_case alias for createDataFrame).

This is an alias for createDataFrame(). See createDataFrame() for full documentation.

Parameters:
  • data – Input data in one of supported formats

  • schema – Optional explicit schema

  • pk – Optional column name(s) to mark as primary key

  • auto_pk – Optional column name(s) to create as auto-incrementing primary key

Returns:

DataFrame querying from the created temporary table

create_index(name: str, table: str, columns: str | Sequence[str], *, unique: bool = False, if_not_exists: bool = True) CreateIndexOperation[source]

Create a lazy create index operation.

Delegates to DDLManager.

Parameters:
  • name – Name of the index to create

  • table – Name of the table to create the index on

  • columns – Column name(s) to index (single string or sequence)

  • unique – If True, create a UNIQUE index (default: False)

  • if_not_exists – If True, don’t error if index already exists (default: True)

Returns:

CreateIndexOperation that executes on collect()

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT"), column("name", "TEXT"), column("age", "INTEGER")]).collect()
>>> # Create single-column index
>>> op = db.create_index("idx_email", "users", "email")
>>> op.collect()  # Executes the CREATE INDEX
>>> # Multi-column index
>>> op2 = db.create_index("idx_name_age", "users", ["name", "age"], unique=True)
>>> op2.collect()
>>> db.close()
create_table(name: str, columns: Sequence[ColumnDef], *, if_not_exists: bool = True, temporary: bool = False, constraints: Sequence['UniqueConstraint' | 'CheckConstraint' | 'ForeignKeyConstraint'] | None = None) CreateTableOperation[source]
create_table(model_class: Type['DeclarativeBase'], *, if_not_exists: bool = True, temporary: bool = False) CreateTableOperation

Create a lazy create table operation.

Delegates to DDLManager.

Parameters:
  • name_or_model – Name of the table to create, or SQLAlchemy model class

  • columns – Sequence of ColumnDef objects defining the table schema (required if name_or_model is str)

  • if_not_exists – If True, don’t error if table already exists (default: True)

  • temporary – If True, create a temporary table (default: False)

  • constraints – Optional sequence of constraint objects (UniqueConstraint, CheckConstraint, ForeignKeyConstraint). Ignored if model_class is provided (constraints are extracted from model).

Returns:

CreateTableOperation that executes on collect()

Raises:
  • ValidationError – If table name or columns are invalid

  • ValueError – If model_class is not a valid SQLAlchemy model

Example

>>> from moltres import connect
>>> from moltres.table.schema import column, unique, check
>>> db = connect("sqlite:///:memory:")
>>> # Create table with constraints
>>> op = db.create_table(
...     "users",
...     [column("id", "INTEGER", primary_key=True), column("email", "TEXT")],
...     constraints=[unique("email"), check("id > 0", name="ck_positive_id")]
... )
>>> table = op.collect()  # Executes the CREATE TABLE
>>> # Verify table was created
>>> tables = db.get_table_names()
>>> "users" in tables
True
>>> db.close()
delete(table_name: str, *, where: Column) int[source]

Delete rows from a table.

Convenience method for deleting data from a table.

Parameters:
  • table_name – Name of the table to delete from

  • whereColumn expression for the WHERE clause

Returns:

Number of rows deleted

Raises:

ValidationError – If table name is invalid

Example

>>> from moltres import connect, col
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> from moltres.io.records import :class:`Records`
>>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], _database=db).insert_into("users")
>>> # Delete rows
>>> count = db.delete("users", where=col("id") == 1)
>>> count
1
>>> # Verify deletion
>>> df = db.table("users").select()
>>> results = df.collect()
>>> len(results)
1
>>> results[0]["name"]
'Bob'
>>> db.close()
property dialect: DialectSpec
drop_index(name: str, table: str | None = None, *, if_exists: bool = True) DropIndexOperation[source]

Create a lazy drop index operation.

Parameters:
  • name – Name of the index to drop

  • table – Optional table name (required for some dialects like MySQL)

  • if_exists – If True, don’t error if index doesn’t exist (default: True)

Returns:

DropIndexOperation that executes on collect()

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT")]).collect()
>>> db.create_index("idx_email", "users", "email").collect()
>>> # Drop index
>>> op = db.drop_index("idx_email", "users")
>>> op.collect()  # Executes the DROP INDEX
>>> db.close()
drop_table(name: str, *, if_exists: bool = True) DropTableOperation[source]

Create a lazy drop table operation.

Delegates to DDLManager.

Parameters:
  • name – Name of the table to drop

  • if_exists – If True, don’t error if table doesn’t exist (default: True)

Returns:

DropTableOperation that executes on collect()

Example

>>> op = db.drop_table("users")
>>> op.collect()  # Executes the DROP TABLE
execute_plan(plan: LogicalPlan, model: Type[Any] | None = None) QueryResult[source]

Execute a logical plan and return results.

Delegates to DatabaseQueryExecutor.

execute_plan_stream(plan: LogicalPlan) Iterator[List[Dict[str, object]]][source]

Execute a plan and return an iterator of row chunks.

Delegates to DatabaseQueryExecutor.

execute_sql(sql: str, params: Dict[str, Any] | None = None) QueryResult[source]

Execute a raw SQL query.

Delegates to DatabaseQueryExecutor.

property executor: QueryExecutor

Get the query executor for this database.

Returns:

The query executor instance

Return type:

QueryExecutor

explain(sql: str, params: Dict[str, Any] | None = None) str[source]

Get the execution plan for a SQL query.

Delegates to DatabaseQueryExecutor.

Parameters:
  • sql – SQL query string

  • params – Optional query parameters

Returns:

Execution plan as a string (dialect-specific)

Example

>>> plan = db.explain("SELECT * FROM users WHERE id = :id", params={"id": 1})
>>> print(plan)
classmethod from_connection(connection: sqlalchemy.engine.Connection, **options: object) Database[source]

Create a Database instance from an existing SQLAlchemy Connection.

This allows you to use Moltres with an existing SQLAlchemy Connection, enabling integration within existing transactions.

Note: The Database will use the Connection’s engine, but will not manage the Connection’s lifecycle. The user is responsible for managing the connection.

Parameters:
  • connection – SQLAlchemy Connection instance

  • **options – Optional configuration parameters (same as from_engine)

Returns:

Database instance configured to use the Connection’s engine

Example

>>> from sqlalchemy import create_engine
>>> from moltres import :class:`Database`
>>> engine = create_engine("sqlite:///:memory:")
>>> with engine.connect() as conn:
...     db = :class:`Database`.from_connection(conn)
...     # Use Moltres within the connection's transaction
...     from moltres.table.schema import column
...     _ = db.create_table("users", [column("id", "INTEGER")]).collect()
classmethod from_engine(engine: Engine, **options: object) Database[source]

Create a Database instance from an existing SQLAlchemy Engine.

This allows you to use Moltres with an existing SQLAlchemy Engine, enabling integration with existing SQLAlchemy projects.

Parameters:
  • engine – SQLAlchemy Engine instance

  • **options – Optional configuration parameters: - echo: Enable SQLAlchemy echo mode - fetch_format: Result format - “records”, “pandas”, or “polars” - dialect: Override SQL dialect detection - query_timeout: Query execution timeout in seconds - Other options are stored in config.options

Returns:

Database instance configured to use the provided Engine

Return type:

Database

Example

>>> from sqlalchemy import create_engine
>>> from moltres import :class:`Database`
>>> engine = create_engine("sqlite:///:memory:")
>>> db = :class:`Database`.from_engine(engine)
>>> # Now use Moltres with your existing engine
>>> from moltres.table.schema import column
>>> _ = db.create_table("users", [column("id", "INTEGER")]).collect()
classmethod from_session(session: Session, **options: object) Database[source]

Create a Database instance from a SQLAlchemy ORM Session.

This allows you to use Moltres with an existing SQLAlchemy ORM Session, enabling integration with ORM-based applications.

Note: The Database will use the Session’s bind/engine, but will not manage the Session’s lifecycle. The user is responsible for managing the session.

Parameters:
  • session – SQLAlchemy ORM Session instance

  • **options – Optional configuration parameters (same as from_engine)

Returns:

Database instance configured to use the Session’s bind/engine

Example

>>> from sqlalchemy import create_engine
>>> from sqlalchemy.orm import sessionmaker
>>> from moltres import :class:`Database`
>>> engine = create_engine("sqlite:///:memory:")
>>> Session = sessionmaker(bind=engine)
>>> with Session() as session:
...     db = :class:`Database`.from_session(session)
...     # Use Moltres with your existing session
...     from moltres.table.schema import column
...     _ = db.create_table("users", [column("id", "INTEGER")]).collect()
get_columns(table_name: str) List['ColumnInfo'][source]

Get column information for a table.

Parameters:

table_name – Name of the table to inspect

Returns:

List of ColumnInfo objects with column metadata

Raises:
  • ValidationError – If table name is invalid

  • ValueError – If database connection is not available

  • RuntimeError – If table does not exist or cannot be inspected

Example

>>> columns = db.get_columns("users")
>>> # Returns: [ColumnInfo(name='id', type_name='INTEGER', ...), ...]
get_table_names(schema: str | None = None) List[str][source]

Get list of table names in the database.

Parameters:

schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

Returns:

List of table names

Raises:

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER")]).collect()
>>> db.create_table("orders", [column("id", "INTEGER")]).collect()
>>> # Get all table names
>>> tables = db.get_table_names()
>>> "users" in tables
True
>>> "orders" in tables
True
>>> db.close()
get_transaction_status() dict[str, object] | None[source]

Get transaction status and metadata if a transaction is active.

Returns:

  • readonly: Whether the transaction is read-only

  • isolation_level: Transaction isolation level (if set)

  • timeout: Transaction timeout in seconds (if set)

  • savepoints: List of active savepoint names

None if no transaction is active

Return type:

Dictionary with transaction metadata including

Example

>>> with db.transaction(isolation_level="SERIALIZABLE", readonly=True) as txn:
...     status = db.get_transaction_status()
...     assert status["isolation_level"] == "SERIALIZABLE"
...     assert status["readonly"] is True
get_view_names(schema: str | None = None) List[str][source]

Get list of view names in the database.

Parameters:

schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

Returns:

List of view names

Raises:

Example

>>> views = db.get_view_names()
>>> # Returns: ['active_users_view', 'order_summary_view']
insert(table_name: str, rows: Sequence[Mapping[str, object]] | 'Records' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame') int[source]

Insert rows into a table.

Delegates to TableManager.

Parameters:
  • table_name – Name of the table to insert into

  • rows – Sequence of row dictionaries, Records, pandas DataFrame, polars DataFrame, or polars LazyFrame

Returns:

Number of rows inserted

Raises:

ValidationError – If table name is invalid or rows are empty

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> # Insert rows
>>> count = db.insert("users", [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}])
>>> count
2
>>> # Verify insertion
>>> df = db.table("users").select()
>>> results = df.collect()
>>> len(results)
2
>>> results[0]["name"]
'Alice'
>>> db.close()
is_in_transaction() bool[source]

Check if currently in a transaction.

Returns:

True if a transaction is active, False otherwise

Example

>>> if db.is_in_transaction():
...     status = db.get_transaction_status()
...     print(f"Isolation: {status['isolation_level']}")
property load: DataLoader

Return a DataLoader for loading data from files and tables as DataFrames.

Note: For SQL operations on tables, use db.table(name).select() instead.

merge(table_name: str, rows: Sequence[Mapping[str, object]] | 'Records' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', *, on: Sequence[str], when_matched: Mapping[str, object] | None = None, when_not_matched: Mapping[str, object] | None = None) int[source]

Merge (upsert) rows into a table.

Convenience method for merging data into a table with conflict resolution.

Parameters:
  • table_name – Name of the table to merge into

  • rows – Sequence of row dictionaries, Records, pandas DataFrame, polars DataFrame, or polars LazyFrame

  • on – Sequence of column names that form the conflict key

  • when_matched – Optional dictionary of column updates when a conflict occurs

  • when_not_matched – Optional dictionary of default values when inserting new rows

Returns:

Number of rows affected (inserted or updated)

Raises:

ValidationError – If table name is invalid, rows are empty, or on columns are invalid

Example

>>> db.merge(
...     "users",
...     [{"id": 1, "name": "Alice", "email": "alice@example.com"}],
...     on=["id"],
...     when_matched={"name": "Alice Updated"}
... )
property read: ReadAccessor

Return a ReadAccessor for accessing read operations.

Use db.read.records.* for Records-based reads (backward compatibility). Use db.load.* for DataFrame-based reads (PySpark-style).

reflect(schema: str | None = None, views: bool = False) Dict[str, 'TableSchema'][source]

Reflect entire database schema.

Parameters:
  • schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

  • views – If True, also reflect views (default: False)

Returns:

Dictionary mapping table/view names to TableSchema objects

Raises:

Example

>>> schemas = db.reflect()
>>> # Returns: {'users': TableSchema(...), 'orders': TableSchema(...)}
reflect_table(name: str, schema: str | None = None) TableSchema[source]

Reflect a single table from the database.

Parameters:
  • name – Name of the table to reflect

  • schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

Returns:

TableSchema object with table metadata

Raises:
  • ValidationError – If table name is invalid

  • ValueError – If database connection is not available

  • RuntimeError – If table does not exist or reflection fails

Example

>>> schema = db.reflect_table("users")
>>> # Returns: TableSchema(name='users', columns=[ColumnDef(...), ...])
scan_csv(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a CSV file as a PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the CSV file

  • schema – Optional explicit schema

  • **options – Format-specific options (e.g., header=True, delimiter=”,”)

Returns:

PolarsDataFrame containing the CSV data (lazy)

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_csv("data.csv", header=True)
>>> results = df.collect()
scan_json(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a JSON file (array of objects) as a PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the JSON file

  • schema – Optional explicit schema

  • **options – Format-specific options (e.g., multiline=True)

Returns:

PolarsDataFrame containing the JSON data (lazy)

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_json("data.json")
>>> results = df.collect()
scan_jsonl(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a JSONL file (one JSON object per line) as a PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the JSONL file

  • schema – Optional explicit schema

  • **options – Format-specific options

Returns:

PolarsDataFrame containing the JSONL data (lazy)

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_jsonl("data.jsonl")
>>> results = df.collect()
scan_parquet(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a Parquet file as a PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the Parquet file

  • schema – Optional explicit schema

  • **options – Format-specific options

Returns:

PolarsDataFrame containing the Parquet data (lazy)

Raises:

RuntimeError – If pandas or pyarrow are not installed

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_parquet("data.parquet")
>>> results = df.collect()
scan_text(path: str, column_name: str = 'value', schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]

Scan a text file as a single column PolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the text file

  • column_name – Name of the column to create (default: “value”)

  • schema – Optional explicit schema

  • **options – Format-specific options

Returns:

PolarsDataFrame containing the text file lines (lazy)

Example

>>> from moltres import connect
>>> db = connect("sqlite:///:memory:")
>>> df = db.scan_text("data.txt", column_name="line")
>>> results = df.collect()
schema(table_name: str) List[ColumnDef][source]

Get the schema (column definitions) for a table.

Parameters:

table_name – Name of the table

Returns:

List of ColumnDef objects describing the table’s columns

Raises:

ValueError – If table does not exist

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> schema = db.schema("users")
>>> len(schema)
2
>>> schema[0].name
'id'
>>> schema[0].type_name
'INTEGER'
show_schema(table_name: str) None[source]

Print a formatted schema for a table.

Convenience method for interactive exploration.

Parameters:

table_name – Name of the table

Example

>>> db.show_schema("users")
Schema for table 'users':
- id: INTEGER (primary_key=True)
- name: TEXT
- email: TEXT
show_tables(schema: str | None = None) None[source]

Print a formatted list of tables in the database.

Convenience method for interactive exploration.

Parameters:

schema – Optional schema name

Example

>>> db.show_tables()
Tables in database:
- users
- orders
- products
sql(sql: str, **params: object) DataFrame[source]

Execute a SQL query and return a DataFrame.

Similar to PySpark’s spark.sql(), this method accepts a raw SQL string and returns a lazy DataFrame that can be chained with further operations. The SQL dialect is determined by the database connection.

Parameters:
  • sql – SQL query string to execute

  • **params – Optional named parameters for parameterized queries. Use :param_name syntax in SQL and pass values as kwargs.

Returns:

Lazy DataFrame that can be chained with further operations

Example

>>> from moltres import connect, col
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT"), column("age", "INTEGER")]).collect()
>>> from moltres.io.records import :class:`Records`
>>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice", "age": 25}, {"id": 2, "name": "Bob", "age": 17}], _database=db).insert_into("users")
>>> # Basic SQL query
>>> df = db.sql("SELECT * FROM users WHERE age > 18")
>>> results = df.collect()
>>> len(results)
1
>>> results[0]["name"]
'Alice'
>>> # Parameterized query
>>> df2 = db.sql("SELECT * FROM users WHERE id = :id", id=1)
>>> results2 = df2.collect()
>>> results2[0]["name"]
'Alice'
>>> # Chaining operations
>>> db.create_table("orders", [column("id", "INTEGER"), column("amount", "REAL")]).collect()
>>> _ = :class:`Records`(_data=[{"id": 1, "amount": 150.0}, {"id": 2, "amount": 50.0}], _database=db).insert_into("orders")
>>> df3 = db.sql("SELECT * FROM orders").where(col("amount") > 100).limit(1)
>>> results3 = df3.collect()
>>> len(results3)
1
>>> results3[0]["amount"]
150.0
>>> db.close()
table(name: str) TableHandle[source]
table(model_class: Type['DeclarativeBase']) TableHandle

Get a handle to a table in the database.

Delegates to TableManager.

Parameters:

name_or_model – Name of the table, SQLAlchemy model class, or SQLModel model class

Returns:

TableHandle for the specified table

Raises:
  • ValidationError – If table name is invalid

  • ValueError – If model_class is not a valid SQLAlchemy or SQLModel model

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> _ = db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> from moltres.io.records import :class:`Records`
>>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users")
>>> # Get table handle by name
>>> users = db.table("users")
>>> df = users.select("id", "name")
>>> results = df.collect()
>>> results[0]["name"]
'Alice'
>>> db.close()
tables(schema: str | None = None) Dict[str, List[ColumnDef]][source]

Get all tables in the database with their schemas.

Parameters:

schema – Optional schema name (for databases that support schemas)

Returns:

Dictionary mapping table names to their column definitions

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER")]).collect()
>>> db.create_table("orders", [column("id", "INTEGER")]).collect()
>>> tables = db.tables()
>>> "users" in tables
True
>>> "orders" in tables
True
>>> len(tables["users"])
1
transaction(savepoint: bool = False, readonly: bool = False, isolation_level: str | None = None, timeout: float | None = None) Iterator[Transaction][source]

Create a transaction context for grouping multiple operations.

All operations within the transaction context share the same transaction. If any exception occurs, the transaction is automatically rolled back. Otherwise, it is committed on successful exit.

Parameters:
  • savepoint – If True and a transaction is already active, create a savepoint instead of raising an error. This enables nested transactions.

  • readonly – If True, set the transaction to read-only mode. Prevents writes.

  • isolation_level – Optional isolation level. Must be one of: - “READ UNCOMMITTED” - “READ COMMITTED” - “REPEATABLE READ” - “SERIALIZABLE”

  • timeout – Optional transaction timeout in seconds. Database-specific behavior: - PostgreSQL: Sets statement_timeout (milliseconds) - MySQL: Sets innodb_lock_wait_timeout (seconds) - SQLite: Not supported

Yields:

Transaction object that can be used for explicit commit/rollback

Example

Basic transaction:

>>> with db.transaction() as txn:
...     df.write.insertInto("table")
...     df.write.update("table", where=..., set={...})
...     # If any operation fails, all are rolled back
...     # Otherwise, all are committed on exit

Nested transaction with savepoint:

>>> with db.transaction() as outer:
...     # ... operations ...
...     with db.transaction(savepoint=True) as inner:
...         # ... operations that can be rolled back independently ...
...         inner.savepoint("checkpoint")
...         # ... operations ...
...         inner.rollback_to_savepoint("checkpoint")
...     # outer transaction continues...

Read-only transaction:

>>> with db.transaction(readonly=True) as txn:
...     results = db.table("users").select().collect()

Transaction with isolation level:

>>> with db.transaction(isolation_level="SERIALIZABLE") as txn:
...     # ... critical operations requiring highest isolation ...
update(table_name: str, *, where: Column, set: Mapping[str, object]) int[source]

Update rows in a table.

Convenience method for updating data in a table.

Parameters:
  • table_name – Name of the table to update

  • whereColumn expression for the WHERE clause

  • set – Dictionary of column names to new values

Returns:

Number of rows updated

Raises:

ValidationError – If table name is invalid or set dictionary is empty

Example

>>> from moltres import connect, col
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> from moltres.io.records import :class:`Records`
>>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users")
>>> # Update rows
>>> count = db.update("users", where=col("id") == 1, set={"name": "Alice Updated"})
>>> count
1
>>> # Verify update
>>> df = db.table("users").select()
>>> results = df.collect()
>>> results[0]["name"]
'Alice Updated'
>>> db.close()
class moltres.table.table.TableHandle(name: str, database: Database, model: Type[Any] | None = None)[source]

Bases: object

Lightweight handle representing a table reference.

A TableHandle provides access to a specific table in the database. It can be created from a table name or from a SQLModel/Pydantic model class.

name

Name of the table

Type:

str

database

The Database instance this handle belongs to

Type:

moltres.table.table.Database

model

Optional SQLModel, Pydantic, or SQLAlchemy model class

Type:

Type[Any] | None

Example

>>> db = connect("sqlite:///:memory:")
>>> handle = db.table("users")
>>> df = handle.select()
columns() List[str][source]

Get the list of column names for this table.

Returns:

List of column names

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
>>> handle = db.table("users")
>>> cols = handle.columns()
>>> "id" in cols
True
>>> "name" in cols
True
database: Database
model: Type[Any] | None = None
property model_class: Type['DeclarativeBase'] | None

Get the SQLAlchemy model class if this handle was created from a model.

Returns:

SQLAlchemy model class or None if handle was created from table name

name: str
pandas(*columns: str) PandasDataFrame[source]

Create a PandasDataFrame from this table.

Parameters:

*columns – Optional column names to select

Returns:

PandasDataFrame for pandas-style operations

Return type:

PandasDataFrame

Example

>>> df = db.table('users').pandas()
>>> df = db.table('users').pandas('id', 'name')
polars(*columns: str) PolarsDataFrame[source]

Create a PolarsDataFrame from this table.

Parameters:

*columns – Optional column names to select

Returns:

PolarsDataFrame for Polars-style operations

Return type:

PolarsDataFrame

Example

>>> df = db.table('users').polars()
>>> df = db.table('users').polars('id', 'name')
select(*columns: str) DataFrame[source]

Select columns from this table.

Parameters:

*columns – Optional column names to select. If empty, selects all columns.

Returns:

DataFrame with selected columns

Return type:

DataFrame

Example

>>> db = connect("sqlite:///:memory:")
>>> handle = db.table("users")
>>> df = handle.select("id", "name")
class moltres.table.table.Transaction(database: Database, connection: sqlalchemy.engine.Connection, readonly: bool = False, isolation_level: str | None = None, is_savepoint: bool = False)[source]

Bases: object

Transaction context for grouping multiple operations.

commit() None[source]

Explicitly commit the transaction.

Raises:

RuntimeError – If the transaction has already been committed or rolled back

is_active() bool[source]

Check if this transaction is still active.

Returns:

True if the transaction is active (not committed or rolled back), False otherwise

is_readonly() bool[source]

Check if this transaction is read-only.

Returns:

True if the transaction is read-only, False otherwise

isolation_level() str | None[source]

Get the transaction isolation level.

Returns:

Isolation level string or None if not set

release_savepoint(name: str) None[source]

Release a savepoint.

Parameters:

name – Savepoint name to release

Raises:

RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist

rollback() None[source]

Explicitly rollback the transaction.

Raises:

RuntimeError – If the transaction has already been committed or rolled back

rollback_to_savepoint(name: str) None[source]

Rollback to a specific savepoint.

Parameters:

name – Savepoint name to rollback to

Raises:

RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist

savepoint(name: str | None = None) str[source]

Create a savepoint within this transaction.

Parameters:

name – Optional savepoint name. If not provided, a unique name is generated.

Returns:

The savepoint name (generated or provided)

Raises:
  • RuntimeError – If the transaction has already been committed or rolled back

  • ValueError – If savepoints are not supported by the dialect

AsyncTableHandle

Async table access primitives.

class moltres.table.async_table.AsyncDatabase(config: MoltresConfig)[source]

Bases: object

Entry-point object returned by moltres.async_connect.

The AsyncDatabase class supports async context manager protocol for automatic connection cleanup. Use it in an async with statement to ensure the connection is properly closed.

Example

Using async context manager (recommended):

>>> async with async_connect("sqlite+aiosqlite:///:memory:") as db:
...     df = db.sql("SELECT * FROM users")
...     results = await df.collect()
...     # await db.close() called automatically on exit
async close() None[source]

Close the database connection and cleanup resources.

compile_plan(plan: LogicalPlan) Any[source]

Compile a logical plan to SQL.

property connection_manager: AsyncConnectionManager
async createDataFrame(data: Sequence[dict[str, Any]] | Sequence[tuple] | 'AsyncRecords' | 'AsyncLazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) AsyncDataFrame[source]

Create an AsyncDataFrame from Python data (list of dicts, list of tuples, AsyncRecords, AsyncLazyRecords, pandas DataFrame, polars DataFrame, or polars LazyFrame).

Creates a temporary table, inserts the data, and returns an AsyncDataFrame querying from that table. If AsyncLazyRecords is provided, it will be auto-materialized. If pandas/polars DataFrame or LazyFrame is provided, it will be converted to Records with lazy conversion.

Parameters:
  • data – Input data in one of supported formats: - List of dicts: [{“col1”: val1, “col2”: val2}, …] - List of tuples: Requires schema parameter with column names - AsyncRecords object: Extracts data and schema if available - AsyncLazyRecords object: Auto-materializes and extracts data and schema - pandas DataFrame: Converts to Records with schema preservation - polars DataFrame: Converts to Records with schema preservation - polars LazyFrame: Materializes and converts to Records with schema preservation

  • schema – Optional explicit schema. If not provided, schema is inferred from data.

  • pk – Optional column name(s) to mark as primary key. Can be a single string or sequence of strings for composite keys.

  • auto_pk – Optional column name(s) to create as auto-incrementing primary key. Can specify same name as pk to make an existing column auto-incrementing.

Returns:

AsyncDataFrame querying from the created temporary table

Raises:
  • ValueError – If data is empty and no schema provided, or if primary key requirements are not met

  • ValidationError – If list of tuples provided without schema, or other validation errors

Example

>>> # Create AsyncDataFrame from list of dicts
>>> df = await db.createDataFrame([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], pk="id")
>>> # Create AsyncDataFrame with auto-incrementing primary key
>>> df = await db.createDataFrame([{"name": "Alice"}, {"name": "Bob"}], auto_pk="id")
>>> # Create AsyncDataFrame from AsyncLazyRecords (auto-materializes)
>>> lazy_records = db.read.records.csv("data.csv")
>>> df = await db.createDataFrame(lazy_records, pk="id")
async create_dataframe(data: Sequence[dict[str, object]] | Sequence[tuple] | 'AsyncRecords' | 'AsyncLazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) AsyncDataFrame[source]

Create an AsyncDataFrame from Python data (snake_case alias for createDataFrame).

This is an alias for createDataFrame(). See createDataFrame() for full documentation.

Parameters:
  • data – Input data in one of supported formats

  • schema – Optional explicit schema

  • pk – Optional column name(s) to mark as primary key

  • auto_pk – Optional column name(s) to create as auto-incrementing primary key

Returns:

AsyncDataFrame querying from the created temporary table

create_index(name: str, table: str, columns: str | Sequence[str], *, unique: bool = False, if_not_exists: bool = True) AsyncCreateIndexOperation[source]

Create a lazy async create index operation.

Parameters:
  • name – Name of the index to create

  • table – Name of the table to create the index on

  • columnsColumn name(s) to index (single string or sequence)

  • unique – If True, create a UNIQUE index (default: False)

  • if_not_exists – If True, don’t error if index already exists (default: True)

Returns:

AsyncCreateIndexOperation that executes on collect()

Example

>>> op = db.create_index("idx_email", "users", "email")
>>> await op.collect()  # Executes the CREATE INDEX
>>> # Multi-column index
>>> op2 = db.create_index("idx_name_age", "users", ["name", "age"], unique=True)
create_table(name: str, columns: Sequence[ColumnDef], *, if_not_exists: bool = True, temporary: bool = False, constraints: Sequence['UniqueConstraint' | 'CheckConstraint' | 'ForeignKeyConstraint'] | None = None) AsyncCreateTableOperation[source]
create_table(model_class: Type['DeclarativeBase'], *, if_not_exists: bool = True, temporary: bool = False) AsyncCreateTableOperation

Create a lazy async create table operation.

Parameters:
  • name_or_model – Name of the table to create, or SQLAlchemy model class

  • columns – Sequence of ColumnDef objects defining the table schema (required if name_or_model is str)

  • if_not_exists – If True, don’t error if table already exists (default: True)

  • temporary – If True, create a temporary table (default: False)

  • constraints – Optional sequence of constraint objects (UniqueConstraint, CheckConstraint, ForeignKeyConstraint). Ignored if model_class is provided (constraints are extracted from model).

Returns:

AsyncCreateTableOperation that executes on collect()

Example

>>> import asyncio
>>> from moltres import async_connect
>>> from moltres.table.schema import column, unique, check
>>> async def example():
...     db = await async_connect("sqlite+aiosqlite:///:memory:")
...     # Create table with schema
...     op = db.create_table(
...         "users",
...         [column("id", "INTEGER", primary_key=True), column("email", "TEXT")],
...         constraints=[unique("email"), check("id > 0", name="ck_positive_id")]
...     )
...     table = await op.collect()  # Executes the CREATE TABLE
...     # Verify table was created
...     tables = await db.get_table_names()
...     "users" in tables
...     True
...     await db.close()
...     # asyncio.run(example())
Raises:
  • ValidationError – If table name or columns are invalid

  • ValueError – If model_class is not a valid SQLAlchemy model

Example

>>> op = db.create_table("users", [column("id", "INTEGER")])
>>> table = await op.collect()  # Executes the CREATE TABLE
>>> # Or with SQLAlchemy model
>>> from sqlalchemy.orm import DeclarativeBase
>>> class User(Base):
...     __tablename__ = "users"
...     id = :class:`Column`(Integer, primary_key=True)
>>> op = db.create_table(User)
>>> table = await op.collect()
property dialect: DialectSpec
drop_index(name: str, table: str | None = None, *, if_exists: bool = True) AsyncDropIndexOperation[source]

Create a lazy async drop index operation.

Parameters:
  • name – Name of the index to drop

  • table – Optional table name (required for some dialects like MySQL)

  • if_exists – If True, don’t error if index doesn’t exist (default: True)

Returns:

AsyncDropIndexOperation that executes on collect()

Example

>>> op = db.drop_index("idx_email", "users")
>>> await op.collect()  # Executes the DROP INDEX
drop_table(name: str, *, if_exists: bool = True) AsyncDropTableOperation[source]

Create a lazy async drop table operation.

Parameters:
  • name – Name of the table to drop

  • if_exists – If True, don’t error if table doesn’t exist (default: True)

Returns:

AsyncDropTableOperation that executes on collect()

Example

>>> op = db.drop_table("users")
>>> await op.collect()  # Executes the DROP TABLE
async execute_plan(plan: LogicalPlan, model: Type[Any] | None = None) AsyncQueryResult[source]

Execute a logical plan and return results.

async execute_plan_stream(plan: LogicalPlan) AsyncIterator[List[Dict[str, object]]][source]

Execute a plan and return an async iterator of row chunks.

async execute_sql(sql: str, params: Dict[str, Any] | None = None) AsyncQueryResult[source]

Execute raw SQL and return results.

property executor: AsyncQueryExecutor
classmethod from_async_connection(connection: sqlalchemy.ext.asyncio.engine.AsyncConnection, **options: object) AsyncDatabase[source]

Create an AsyncDatabase instance from an existing SQLAlchemy AsyncConnection.

This allows you to use Moltres with an existing SQLAlchemy AsyncConnection, enabling integration within existing async transactions.

Note: The AsyncDatabase will use the AsyncConnection’s engine, but will not manage the AsyncConnection’s lifecycle. The user is responsible for managing the connection.

Parameters:
  • connection – SQLAlchemy AsyncConnection instance

  • **options – Optional configuration parameters (same as from_async_engine)

Returns:

AsyncDatabase instance configured to use the AsyncConnection’s engine

Example

>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from moltres import :class:`AsyncDatabase`
>>> engine = create_async_engine("sqlite+aiosqlite:///:memory:")
>>> async with engine.connect() as conn:
...     db = :class:`AsyncDatabase`.from_async_connection(conn)
...     # Use Moltres within the connection's transaction
...     from moltres.table.schema import column
...     await db.create_table("users", [column("id", "INTEGER")]).collect()
classmethod from_async_engine(engine: AsyncEngine, **options: object) AsyncDatabase[source]

Create an AsyncDatabase instance from an existing SQLAlchemy AsyncEngine.

This allows you to use Moltres with an existing SQLAlchemy AsyncEngine, enabling integration with existing async SQLAlchemy projects.

Parameters:
  • engine – SQLAlchemy AsyncEngine instance

  • **options – Optional configuration parameters: - echo: Enable SQLAlchemy echo mode - fetch_format: Result format - “records”, “pandas”, or “polars” - dialect: Override SQL dialect detection - query_timeout: Query execution timeout in seconds - Other options are stored in config.options

Returns:

AsyncDatabase instance configured to use the provided AsyncEngine

Example

>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from moltres import :class:`AsyncDatabase`
>>> engine = create_async_engine("sqlite+aiosqlite:///:memory:")
>>> db = :class:`AsyncDatabase`.from_async_engine(engine)
>>> # Now use Moltres with your existing async engine
>>> from moltres.table.schema import column
>>> await db.create_table("users", [column("id", "INTEGER")]).collect()
classmethod from_async_session(session: AsyncSession, **options: object) AsyncDatabase[source]

Create an AsyncDatabase instance from a SQLAlchemy AsyncSession.

This allows you to use Moltres with an existing SQLAlchemy AsyncSession, enabling integration with async ORM-based applications.

Note: The AsyncDatabase will use the AsyncSession’s bind/engine, but will not manage the AsyncSession’s lifecycle. The user is responsible for managing the session.

Parameters:
  • session – SQLAlchemy AsyncSession instance

  • **options – Optional configuration parameters (same as from_async_engine)

Returns:

AsyncDatabase instance configured to use the AsyncSession’s bind/engine

Example

>>> from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
>>> from moltres import :class:`AsyncDatabase`
>>> engine = create_async_engine("sqlite+aiosqlite:///:memory:")
>>> AsyncSession = async_sessionmaker(bind=engine)
>>> async with AsyncSession() as session:
...     db = :class:`AsyncDatabase`.from_async_session(session)
...     # Use Moltres with your existing async session
...     from moltres.table.schema import column
...     await db.create_table("users", [column("id", "INTEGER")]).collect()
async get_columns(table_name: str) List['ColumnInfo'][source]

Get column information for a table.

Parameters:

table_name – Name of the table to inspect

Returns:

List of ColumnInfo objects with column metadata

Raises:
  • ValidationError – If table name is invalid

  • ValueError – If database connection is not available

  • RuntimeError – If table does not exist or cannot be inspected

Example

>>> columns = await db.get_columns("users")
>>> # Returns: [ColumnInfo(name='id', type_name='INTEGER', ...), ...]
async get_table_names(schema: str | None = None) List[str][source]

Get list of table names in the database.

Parameters:

schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

Returns:

List of table names

Raises:

Example

>>> import asyncio
>>> from moltres import async_connect
>>> from moltres.table.schema import column
>>> async def example():
...     db = await async_connect("sqlite+aiosqlite:///:memory:")
...     await db.create_table("users", [column("id", "INTEGER")]).collect()
...     await db.create_table("orders", [column("id", "INTEGER")]).collect()
...     # Get all table names
...     tables = await db.get_table_names()
...     "users" in tables
...     True
...     "orders" in tables
...     True
...     await db.close()
...     # asyncio.run(example())
get_transaction_status() dict[str, object] | None[source]

Get transaction status and metadata if a transaction is active.

Returns:

  • readonly: Whether the transaction is read-only

  • isolation_level: Transaction isolation level (if set)

  • timeout: Transaction timeout in seconds (if set)

  • savepoints: List of active savepoint names

None if no transaction is active

Return type:

Dictionary with transaction metadata including

Example

>>> async with db.transaction(isolation_level="SERIALIZABLE", readonly=True) as txn:
...     status = db.get_transaction_status()
...     assert status["isolation_level"] == "SERIALIZABLE"
...     assert status["readonly"] is True
async get_view_names(schema: str | None = None) List[str][source]

Get list of view names in the database.

Parameters:

schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

Returns:

List of view names

Raises:

Example

>>> views = await db.get_view_names()
>>> # Returns: ['active_users_view', 'order_summary_view']
is_in_transaction() bool[source]

Check if currently in a transaction.

Returns:

True if a transaction is active, False otherwise

Example

>>> if db.is_in_transaction():
...     status = db.get_transaction_status()
...     print(f"Isolation: {status['isolation_level']}")
property load: AsyncDataLoader

Return an AsyncDataLoader for loading data from files and tables as AsyncDataFrames.

Note: For SQL operations on tables, use await db.table(name).select() instead.

property read: AsyncReadAccessor

Return an AsyncReadAccessor for accessing read operations.

Use await db.read.records.* for AsyncRecords-based reads (backward compatibility). Use db.load.* for AsyncDataFrame-based reads (PySpark-style).

async reflect(schema: str | None = None, views: bool = False) Dict[str, 'TableSchema'][source]

Reflect entire database schema.

Parameters:
  • schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

  • views – If True, also reflect views (default: False)

Returns:

Dictionary mapping table/view names to TableSchema objects

Raises:

Example

>>> schemas = await db.reflect()
>>> # Returns: {'users': TableSchema(...), 'orders': TableSchema(...)}
async reflect_table(name: str, schema: str | None = None) TableSchema[source]

Reflect a single table from the database.

Parameters:
  • name – Name of the table to reflect

  • schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.

Returns:

TableSchema object with table metadata

Raises:
  • ValidationError – If table name is invalid

  • ValueError – If database connection is not available

  • RuntimeError – If table does not exist or reflection fails

Example

>>> schema = await db.reflect_table("users")
>>> # Returns: TableSchema(name='users', columns=[ColumnDef(...), ...])
async scan_csv(path: str, schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]

Scan a CSV file as an AsyncPolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the CSV file

  • schema – Optional explicit schema

  • **options – Format-specific options (e.g., header=True, delimiter=”,”)

Returns:

AsyncPolarsDataFrame containing the CSV data (lazy)

Example

>>> from moltres import async_connect
>>> db = await async_connect("sqlite+aiosqlite:///:memory:")
>>> df = await db.scan_csv("data.csv", header=True)
>>> results = await df.collect()
async scan_json(path: str, schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]

Scan a JSON file (array of objects) as an AsyncPolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the JSON file

  • schema – Optional explicit schema

  • **options – Format-specific options (e.g., multiline=True)

Returns:

AsyncPolarsDataFrame containing the JSON data (lazy)

Example

>>> from moltres import async_connect
>>> db = await async_connect("sqlite+aiosqlite:///:memory:")
>>> df = await db.scan_json("data.json")
>>> results = await df.collect()
async scan_jsonl(path: str, schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]

Scan a JSONL file (one JSON object per line) as an AsyncPolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the JSONL file

  • schema – Optional explicit schema

  • **options – Format-specific options

Returns:

AsyncPolarsDataFrame containing the JSONL data (lazy)

Example

>>> from moltres import async_connect
>>> db = await async_connect("sqlite+aiosqlite:///:memory:")
>>> df = await db.scan_jsonl("data.jsonl")
>>> results = await df.collect()
async scan_parquet(path: str, schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]

Scan a Parquet file as an AsyncPolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the Parquet file

  • schema – Optional explicit schema

  • **options – Format-specific options

Returns:

AsyncPolarsDataFrame containing the Parquet data (lazy)

Raises:

RuntimeError – If pandas or pyarrow are not installed

Example

>>> from moltres import async_connect
>>> db = await async_connect("sqlite+aiosqlite:///:memory:")
>>> df = await db.scan_parquet("data.parquet")
>>> results = await df.collect()
async scan_text(path: str, column_name: str = 'value', schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]

Scan a text file as a single column AsyncPolarsDataFrame (Polars-style).

Parameters:
  • path – Path to the text file

  • column_name – Name of the column to create (default: “value”)

  • schema – Optional explicit schema

  • **options – Format-specific options

Returns:

AsyncPolarsDataFrame containing the text file lines (lazy)

Example

>>> from moltres import async_connect
>>> db = await async_connect("sqlite+aiosqlite:///:memory:")
>>> df = await db.scan_text("data.txt", column_name="line")
>>> results = await df.collect()
sql(sql: str, **params: object) AsyncDataFrame[source]

Execute a SQL query and return an AsyncDataFrame.

Similar to PySpark’s spark.sql(), this method accepts a raw SQL string and returns a lazy AsyncDataFrame that can be chained with further operations. The SQL dialect is determined by the database connection.

Parameters:
  • sql – SQL query string to execute

  • **params – Optional named parameters for parameterized queries. Use :param_name syntax in SQL and pass values as kwargs.

Returns:

Lazy AsyncDataFrame that can be chained with further operations

Example

>>> import asyncio
>>> from moltres import async_connect, col
>>> from moltres.table.schema import column
>>> async def example():
...     db = await async_connect("sqlite+aiosqlite:///:memory:")
...     await db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT"), column("age", "INTEGER")]).collect()
...     from moltres.io.records import :class:`AsyncRecords`
...     records = :class:`AsyncRecords`(_data=[{"id": 1, "name": "Alice", "age": 25}, {"id": 2, "name": "Bob", "age": 17}], _database=db)
...     await records.insert_into("users")
...     # Basic SQL query
...     df = db.sql("SELECT * FROM users WHERE age > 18")
...     results = await df.collect()
...     len(results)
...     1
...     # Parameterized query
...     df2 = db.sql("SELECT * FROM users WHERE id = :id AND name = :name", id=1, name="Alice")
...     results2 = await df2.collect()
...     results2[0]["name"]
...     'Alice'
...     await db.close()
...     # asyncio.run(example())
async table(name: str) AsyncTableHandle[source]
async table(model_class: Type['DeclarativeBase']) AsyncTableHandle

Get a handle to a table in the database.

Parameters:

name_or_model – Name of the table, SQLAlchemy model class, or SQLModel model class

Returns:

AsyncTableHandle for the specified table

Raises:
  • ValidationError – If table name is invalid

  • ValueError – If model_class is not a valid SQLAlchemy or SQLModel model

Example

>>> import asyncio
>>> from moltres import async_connect
>>> from moltres.table.schema import column
>>> async def example():
...     db = await async_connect("sqlite+aiosqlite:///:memory:")
...     await db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect()
...     from moltres.io.records import :class:`AsyncRecords`
...     records = :class:`AsyncRecords`(_data=[{"id": 1, "name": "Alice"}], _database=db)
...     await records.insert_into("users")
...     # Get table handle
...     users = await db.table("users")
...     df = users.select("id", "name")
...     results = await df.collect()
...     results[0]["name"]
...     'Alice'
...     await db.close()
...     # asyncio.run(example())
transaction(savepoint: bool = False, readonly: bool = False, isolation_level: str | None = None, timeout: float | None = None) AsyncIterator[AsyncTransaction][source]

Create an async transaction context for grouping multiple operations.

All operations within the transaction context share the same transaction. If any exception occurs, the transaction is automatically rolled back. Otherwise, it is committed on successful exit.

Parameters:
  • savepoint – If True and a transaction is already active, create a savepoint instead of raising an error. This enables nested transactions.

  • readonly – If True, set the transaction to read-only mode. Prevents writes.

  • isolation_level – Optional isolation level. Must be one of: - “READ UNCOMMITTED” - “READ COMMITTED” - “REPEATABLE READ” - “SERIALIZABLE”

  • timeout – Optional transaction timeout in seconds. Database-specific behavior: - PostgreSQL: Sets statement_timeout (milliseconds) - MySQL: Sets innodb_lock_wait_timeout (seconds) - SQLite: Not supported

Yields:

AsyncTransaction object that can be used for explicit commit/rollback

Example

Basic transaction:

>>> async with db.transaction() as txn:
...     await df.write.insertInto("table")
...     await df.write.update("table", where=..., set={...})
...     # If any operation fails, all are rolled back
...     # Otherwise, all are committed on exit

Nested transaction with savepoint:

>>> async with db.transaction() as outer:
...     # ... operations ...
...     async with db.transaction(savepoint=True) as inner:
...         # ... operations that can be rolled back independently ...
...         await inner.savepoint("checkpoint")
...         # ... operations ...
...         await inner.rollback_to_savepoint("checkpoint")
...     # outer transaction continues...

Read-only transaction:

>>> async with db.transaction(readonly=True) as txn:
...     results = await db.table("users").select().collect()

Transaction with isolation level:

>>> async with db.transaction(isolation_level="SERIALIZABLE") as txn:
...     # ... critical operations requiring highest isolation ...
class moltres.table.async_table.AsyncTableHandle(name: str, database: AsyncDatabase, model: Type[Any] | None = None)[source]

Bases: object

Lightweight handle representing a table reference for async operations.

database: AsyncDatabase
model: Type[Any] | None = None
property model_class: Type['DeclarativeBase'] | None

Get the SQLAlchemy model class if this handle was created from a model.

Returns:

SQLAlchemy model class or None if handle was created from table name

name: str
pandas() AsyncPandasDataFrame[source]

Create an AsyncPandasDataFrame from this table (Pandas-style entry point).

Returns:

AsyncPandasDataFrame querying from this table

Example

>>> from moltres import async_connect
>>> db = await async_connect("sqlite+aiosqlite:///:memory:")
>>> table = await db.table("users")
>>> df = table.pandas()
>>> results = await df.collect()
polars() AsyncPolarsDataFrame[source]

Create an AsyncPolarsDataFrame from this table (Polars-style entry point).

Returns:

AsyncPolarsDataFrame querying from this table

Example

>>> from moltres import async_connect
>>> db = await async_connect("sqlite+aiosqlite:///:memory:")
>>> table = await db.table("users")
>>> df = table.polars()
>>> results = await df.collect()
select(*columns: str) AsyncDataFrame[source]
class moltres.table.async_table.AsyncTransaction(database: AsyncDatabase, connection: sqlalchemy.ext.asyncio.engine.AsyncConnection, readonly: bool = False, isolation_level: str | None = None, is_savepoint: bool = False)[source]

Bases: object

Async transaction context for grouping multiple operations.

async commit() None[source]

Explicitly commit the transaction.

is_active() bool[source]

Check if this transaction is still active.

Returns:

True if the transaction is active (not committed or rolled back), False otherwise

is_readonly() bool[source]

Check if this transaction is read-only.

Returns:

True if the transaction is read-only, False otherwise

isolation_level() str | None[source]

Get the transaction isolation level.

Returns:

Isolation level string or None if not set

async release_savepoint(name: str) None[source]

Release a savepoint.

Parameters:

name – Savepoint name to release

Raises:

RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist

async rollback() None[source]

Explicitly rollback the transaction.

async rollback_to_savepoint(name: str) None[source]

Rollback to a specific savepoint.

Parameters:

name – Savepoint name to rollback to

Raises:

RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist

async savepoint(name: str | None = None) str[source]

Create a savepoint within this transaction.

Parameters:

name – Optional savepoint name. If not provided, a unique name is generated.

Returns:

The savepoint name (generated or provided)

Raises:
  • RuntimeError – If the transaction has already been committed or rolled back

  • ValueError – If savepoints are not supported by the dialect

Schema

Schema definition primitives for table creation.

class moltres.table.schema.CheckConstraint(name: str | None = None, expression: str = '')[source]

Bases: object

Definition of a CHECK constraint.

expression: str = ''
name: str | None = None
class moltres.table.schema.ColumnDef(name: str, type_name: str, nullable: bool = True, default: object | None = None, primary_key: bool = False, precision: int | None = None, scale: int | None = None)[source]

Bases: object

Definition of a single table column.

default: object | None = None
name: str
nullable: bool = True
precision: int | None = None
primary_key: bool = False
scale: int | None = None
type_name: str
class moltres.table.schema.ForeignKeyConstraint(name: str | None = None, columns: str | Sequence[str] = (), references_table: str = '', references_columns: str | Sequence[str] = (), on_delete: str | None = None, on_update: str | None = None)[source]

Bases: object

Definition of a FOREIGN KEY constraint.

columns: str | Sequence[str] = ()
name: str | None = None
on_delete: str | None = None
on_update: str | None = None
references_columns: str | Sequence[str] = ()
references_table: str = ''
class moltres.table.schema.TableSchema(name: str, columns: Sequence[ColumnDef], if_not_exists: bool = True, temporary: bool = False, constraints: Sequence[UniqueConstraint | CheckConstraint | ForeignKeyConstraint] = ())[source]

Bases: object

Complete schema definition for a table.

columns: Sequence[ColumnDef]
constraints: Sequence[UniqueConstraint | CheckConstraint | ForeignKeyConstraint] = ()
if_not_exists: bool = True
name: str
temporary: bool = False
class moltres.table.schema.UniqueConstraint(name: str | None = None, columns: str | Sequence[str] = ())[source]

Bases: object

Definition of a UNIQUE constraint.

columns: str | Sequence[str] = ()
name: str | None = None
moltres.table.schema.check(expression: str, name: str | None = None) CheckConstraint[source]

Convenience helper for creating CHECK constraints.

Parameters:
  • expression – SQL expression for the check constraint (e.g., “age > 0”)

  • name – Optional constraint name

Returns:

CheckConstraint object

Example

>>> from moltres.table.schema import check
>>> ck = check("age >= 0 AND age <= 150", name="ck_valid_age")
moltres.table.schema.column(name: str, type_name: str, nullable: bool = True, default: object | None = None, primary_key: bool = False, precision: int | None = None, scale: int | None = None) ColumnDef[source]

Convenience helper for creating column definitions.

Parameters:
  • name – Column name

  • type_name – SQL type name (e.g., “INTEGER”, “TEXT”, “REAL”, “DECIMAL”)

  • nullable – Whether the column allows NULL values (default: True)

  • default – Default value for the column (default: None)

  • primary_key – Whether this column is a primary key (default: False)

  • precision – Precision for DECIMAL/NUMERIC types (default: None)

  • scale – Scale for DECIMAL/NUMERIC types (default: None)

Returns:

ColumnDef object for use in table creation

Return type:

ColumnDef

Example

>>> from moltres import connect
>>> from moltres.table.schema import column
>>> db = connect("sqlite:///:memory:")
>>> # Create table with column definitions
>>> _ = db.create_table(
...     "users",
...     [
...         column("id", "INTEGER", primary_key=True),
...         column("name", "TEXT", nullable=False),
...         column("age", "INTEGER"),
...         column("balance", "DECIMAL", precision=10, scale=2)
...     ]
... ).collect()
>>> from moltres.io.records import :class:`Records`
>>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice", "age": 30, "balance": 100.50}], _database=db).insert_into("users")
>>> df = db.table("users").select()
>>> results = df.collect()
>>> results[0]["name"]
'Alice'
>>> results[0]["age"]
30
>>> db.close()
moltres.table.schema.decimal(name: str, precision: int, scale: int = 0, nullable: bool = True, default: object | None = None, primary_key: bool = False) ColumnDef[source]

Convenience helper for creating DECIMAL/NUMERIC column definitions.

Parameters:
  • nameColumn name

  • precision – Total number of digits

  • scale – Number of digits after the decimal point

  • nullable – Whether the column can be NULL

  • default – Default value for the column

  • primary_key – Whether this column is a primary key

Returns:

ColumnDef with type_name=”DECIMAL” and precision/scale set

Example

>>> from moltres.table.schema import decimal
>>> col = decimal("price", precision=10, scale=2)  # DECIMAL(10, 2)
moltres.table.schema.foreign_key(columns: str | Sequence[str], references_table: str, references_columns: str | Sequence[str], name: str | None = None, on_delete: str | None = None, on_update: str | None = None) ForeignKeyConstraint[source]

Convenience helper for creating FOREIGN KEY constraints.

Parameters:
  • columnsColumn name(s) in this table

  • references_table – Name of the referenced table

  • references_columnsColumn name(s) in the referenced table

  • name – Optional constraint name

  • on_delete – Optional action on delete (e.g., “CASCADE”, “SET NULL”, “RESTRICT”)

  • on_update – Optional action on update (e.g., “CASCADE”, “SET NULL”, “RESTRICT”)

Returns:

ForeignKeyConstraint object

Example

>>> from moltres.table.schema import foreign_key
>>> # Single column foreign key
>>> fk1 = foreign_key("user_id", "users", "id", on_delete="CASCADE")
>>> # Multi-column foreign key
>>> fk2 = foreign_key(["order_id", "item_id"], "order_items", ["id", "id"])
moltres.table.schema.json(name: str, nullable: bool = True, default: object | None = None, jsonb: bool = False) ColumnDef[source]

Convenience helper for creating JSON/JSONB column definitions.

Parameters:
  • nameColumn name

  • nullable – Whether the column can be NULL

  • default – Default value for the column

  • jsonb – If True, use JSONB (PostgreSQL only), otherwise use JSON

Returns:

ColumnDef with type_name=”JSONB” (PostgreSQL with jsonb=True), “JSON” (MySQL/PostgreSQL), or “TEXT” (SQLite)

Example

>>> from moltres.table.schema import json
>>> col = json("data")  # JSON type
>>> col2 = json("metadata", jsonb=True)  # JSONB type (PostgreSQL)
moltres.table.schema.unique(columns: str | Sequence[str], name: str | None = None) UniqueConstraint[source]

Convenience helper for creating UNIQUE constraints.

Parameters:
  • columnsColumn name(s) for the unique constraint

  • name – Optional constraint name

Returns:

UniqueConstraint object

Example

>>> from moltres.table.schema import unique
>>> # Single column unique constraint
>>> uq1 = unique("email")
>>> # Multi-column unique constraint
>>> uq2 = unique(["user_id", "session_id"], name="uq_user_session")
moltres.table.schema.uuid(name: str, nullable: bool = True, default: object | None = None, primary_key: bool = False) ColumnDef[source]

Convenience helper for creating UUID column definitions.

Parameters:
  • nameColumn name

  • nullable – Whether the column can be NULL

  • default – Default value for the column

  • primary_key – Whether this column is a primary key

Returns:

ColumnDef with type_name=”UUID” (PostgreSQL) or “CHAR(36)” (MySQL) or “TEXT” (SQLite)

Example

>>> from moltres.table.schema import uuid
>>> col = uuid("id", primary_key=True)  # UUID type