Table API
Table handles are the entry point for CRUD operations and schema management. They provide eager helpers for inserts, updates, deletes, and DDL.
TableHandle
Table access primitives.
This module provides the core database and table access functionality:
Database- Main database connection and query interfaceTableHandle- Lightweight reference to a database tableTransaction- Transaction context manager
The Database class is the primary entry point for all database operations,
including table creation, querying, and data mutations.
- class moltres.table.table.Database(config: MoltresConfig)[source]
Bases:
objectEntry-point object returned by
moltres.connect().The
Databaseclass provides the main interface for all database operations in Moltres. It handles connections, query execution, table management, and data mutations.The
Databaseclass supports context manager protocol for automatic connection cleanup. Use it in awithstatement to ensure the connection is properly closed.- config
The
MoltresConfiginstance used for this database
- dialect
The SQL dialect being used (e.g., “sqlite”, “postgresql”)
Example
Basic usage:
>>> from moltres import connect, col >>> db = connect("sqlite:///example.db") >>> df = db.table("users").select().where(col("active") == True) >>> results = df.collect() >>> db.close()
Using context manager (recommended):
>>> with connect("sqlite:///example.db") as db: ... df = db.table("users").select().where(col("active") == True) ... results = df.collect() ... # db.close() called automatically on exit
- batch() OperationBatch[source]
Create a batch context for grouping multiple operations.
All operations within the batch context are executed together in a single transaction when the context exits. If any exception occurs, all operations are rolled back.
- Returns:
OperationBatch context manager
Example
>>> with db.batch(): ... db.create_table("users", [...]) ... # All operations execute together on exit
- close() None[source]
Close all database connections and dispose of the engine.
This should be called when done with the database connection, especially for ephemeral test databases.
Note: After calling close(), the
Databaseinstance should not be used.
- compile_plan(plan: LogicalPlan) Select[source]
Compile a logical plan to a SQLAlchemy Select statement.
- property connection_manager: ConnectionManager
Get the connection manager for this database.
- Returns:
The connection manager instance
- Return type:
- createDataFrame(data: Sequence[dict[str, Any]] | Sequence[tuple] | Records | 'LazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) DataFrame[source]
Create a DataFrame from Python data.
Delegates to
EphemeralTableManager.Creates a temporary table, inserts the data, and returns a DataFrame querying from that table. If LazyRecords is provided, it will be auto-materialized. If pandas/polars DataFrame or LazyFrame is provided, it will be converted to Records with lazy conversion.
- Parameters:
data – Input data in one of supported formats: - List of dicts: [{“col1”: val1, “col2”: val2}, …] - List of tuples: Requires schema parameter with column names - Records object: Extracts data and schema if available - LazyRecords object: Auto-materializes and extracts data and schema - pandas DataFrame: Converts to Records with schema preservation - polars DataFrame: Converts to Records with schema preservation - polars LazyFrame: Materializes and converts to Records with schema preservation
schema – Optional explicit schema. If not provided, schema is inferred from data.
pk – Optional column name(s) to mark as primary key. Can be a single string or sequence of strings for composite keys.
auto_pk – Optional column name(s) to create as auto-incrementing primary key. Can specify same name as pk to make an existing column auto-incrementing.
- Returns:
DataFrame querying from the created temporary table
- Raises:
ValueError – If data is empty and no schema provided, or if primary key requirements are not met
ValidationError – If list of tuples provided without schema, or other validation errors
Example
>>> # Create DataFrame from list of dicts >>> df = db.createDataFrame([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], pk="id") >>> # Create DataFrame with auto-incrementing primary key >>> df = db.createDataFrame([{"name": "Alice"}, {"name": "Bob"}], auto_pk="id") >>> # Create DataFrame from Records >>> from moltres.io.records import Records >>> records = Records(_data=[{"id": 1, "name": "Alice"}], _database=db) >>> df = db.createDataFrame(records, pk="id") >>> # Create DataFrame from LazyRecords (auto-materializes) >>> lazy_records = db.read.records.csv("data.csv") >>> df = db.createDataFrame(lazy_records, pk="id") >>> # Create DataFrame from pandas DataFrame >>> import pandas as pd >>> pdf = pd.DataFrame([{"id": 1, "name": "Alice"}]) >>> df = db.createDataFrame(pdf, pk="id") >>> # Create DataFrame from polars DataFrame >>> import polars as pl >>> plf = pl.DataFrame([{"id": 1, "name": "Alice"}]) >>> df = db.createDataFrame(plf, pk="id")
- create_dataframe(data: Sequence[dict[str, object]] | Sequence[tuple] | Records | 'LazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) DataFrame[source]
Create a DataFrame from Python data (snake_case alias for createDataFrame).
This is an alias for
createDataFrame(). SeecreateDataFrame()for full documentation.- Parameters:
data – Input data in one of supported formats
schema – Optional explicit schema
pk – Optional column name(s) to mark as primary key
auto_pk – Optional column name(s) to create as auto-incrementing primary key
- Returns:
DataFrame querying from the created temporary table
- create_index(name: str, table: str, columns: str | Sequence[str], *, unique: bool = False, if_not_exists: bool = True) CreateIndexOperation[source]
Create a lazy create index operation.
Delegates to
DDLManager.- Parameters:
name – Name of the index to create
table – Name of the table to create the index on
columns – Column name(s) to index (single string or sequence)
unique – If True, create a UNIQUE index (default: False)
if_not_exists – If True, don’t error if index already exists (default: True)
- Returns:
CreateIndexOperation that executes on collect()
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT"), column("name", "TEXT"), column("age", "INTEGER")]).collect() >>> # Create single-column index >>> op = db.create_index("idx_email", "users", "email") >>> op.collect() # Executes the CREATE INDEX >>> # Multi-column index >>> op2 = db.create_index("idx_name_age", "users", ["name", "age"], unique=True) >>> op2.collect() >>> db.close()
- create_table(name: str, columns: Sequence[ColumnDef], *, if_not_exists: bool = True, temporary: bool = False, constraints: Sequence['UniqueConstraint' | 'CheckConstraint' | 'ForeignKeyConstraint'] | None = None) CreateTableOperation[source]
- create_table(model_class: Type['DeclarativeBase'], *, if_not_exists: bool = True, temporary: bool = False) CreateTableOperation
Create a lazy create table operation.
Delegates to
DDLManager.- Parameters:
name_or_model – Name of the table to create, or SQLAlchemy model class
columns – Sequence of ColumnDef objects defining the table schema (required if name_or_model is str)
if_not_exists – If True, don’t error if table already exists (default: True)
temporary – If True, create a temporary table (default: False)
constraints – Optional sequence of constraint objects (UniqueConstraint, CheckConstraint, ForeignKeyConstraint). Ignored if model_class is provided (constraints are extracted from model).
- Returns:
CreateTableOperation that executes on collect()
- Raises:
ValidationError – If table name or columns are invalid
ValueError – If model_class is not a valid SQLAlchemy model
Example
>>> from moltres import connect >>> from moltres.table.schema import column, unique, check >>> db = connect("sqlite:///:memory:") >>> # Create table with constraints >>> op = db.create_table( ... "users", ... [column("id", "INTEGER", primary_key=True), column("email", "TEXT")], ... constraints=[unique("email"), check("id > 0", name="ck_positive_id")] ... ) >>> table = op.collect() # Executes the CREATE TABLE >>> # Verify table was created >>> tables = db.get_table_names() >>> "users" in tables True >>> db.close()
- delete(table_name: str, *, where: Column) int[source]
Delete rows from a table.
Convenience method for deleting data from a table.
- Parameters:
table_name – Name of the table to delete from
where –
Columnexpression for the WHERE clause
- Returns:
Number of rows deleted
- Raises:
ValidationError – If table name is invalid
Example
>>> from moltres import connect, col >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], _database=db).insert_into("users") >>> # Delete rows >>> count = db.delete("users", where=col("id") == 1) >>> count 1 >>> # Verify deletion >>> df = db.table("users").select() >>> results = df.collect() >>> len(results) 1 >>> results[0]["name"] 'Bob' >>> db.close()
- property dialect: DialectSpec
- drop_index(name: str, table: str | None = None, *, if_exists: bool = True) DropIndexOperation[source]
Create a lazy drop index operation.
- Parameters:
name – Name of the index to drop
table – Optional table name (required for some dialects like MySQL)
if_exists – If True, don’t error if index doesn’t exist (default: True)
- Returns:
DropIndexOperation that executes on collect()
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT")]).collect() >>> db.create_index("idx_email", "users", "email").collect() >>> # Drop index >>> op = db.drop_index("idx_email", "users") >>> op.collect() # Executes the DROP INDEX >>> db.close()
- drop_table(name: str, *, if_exists: bool = True) DropTableOperation[source]
Create a lazy drop table operation.
Delegates to
DDLManager.- Parameters:
name – Name of the table to drop
if_exists – If True, don’t error if table doesn’t exist (default: True)
- Returns:
DropTableOperation that executes on collect()
Example
>>> op = db.drop_table("users") >>> op.collect() # Executes the DROP TABLE
- execute_plan(plan: LogicalPlan, model: Type[Any] | None = None) QueryResult[source]
Execute a logical plan and return results.
Delegates to
DatabaseQueryExecutor.
- execute_plan_stream(plan: LogicalPlan) Iterator[List[Dict[str, object]]][source]
Execute a plan and return an iterator of row chunks.
Delegates to
DatabaseQueryExecutor.
- execute_sql(sql: str, params: Dict[str, Any] | None = None) QueryResult[source]
Execute a raw SQL query.
Delegates to
DatabaseQueryExecutor.
- property executor: QueryExecutor
Get the query executor for this database.
- Returns:
The query executor instance
- Return type:
- explain(sql: str, params: Dict[str, Any] | None = None) str[source]
Get the execution plan for a SQL query.
Delegates to
DatabaseQueryExecutor.- Parameters:
sql – SQL query string
params – Optional query parameters
- Returns:
Execution plan as a string (dialect-specific)
Example
>>> plan = db.explain("SELECT * FROM users WHERE id = :id", params={"id": 1}) >>> print(plan)
- classmethod from_connection(connection: sqlalchemy.engine.Connection, **options: object) Database[source]
Create a
Databaseinstance from an existing SQLAlchemy Connection.This allows you to use Moltres with an existing SQLAlchemy Connection, enabling integration within existing transactions.
Note: The
Databasewill use the Connection’s engine, but will not manage the Connection’s lifecycle. The user is responsible for managing the connection.- Parameters:
connection – SQLAlchemy Connection instance
**options – Optional configuration parameters (same as from_engine)
- Returns:
Databaseinstance configured to use the Connection’s engine
Example
>>> from sqlalchemy import create_engine >>> from moltres import :class:`Database` >>> engine = create_engine("sqlite:///:memory:") >>> with engine.connect() as conn: ... db = :class:`Database`.from_connection(conn) ... # Use Moltres within the connection's transaction ... from moltres.table.schema import column ... _ = db.create_table("users", [column("id", "INTEGER")]).collect()
- classmethod from_engine(engine: Engine, **options: object) Database[source]
Create a
Databaseinstance from an existing SQLAlchemy Engine.This allows you to use Moltres with an existing SQLAlchemy Engine, enabling integration with existing SQLAlchemy projects.
- Parameters:
engine – SQLAlchemy Engine instance
**options – Optional configuration parameters: - echo: Enable SQLAlchemy echo mode - fetch_format: Result format - “records”, “pandas”, or “polars” - dialect: Override SQL dialect detection - query_timeout: Query execution timeout in seconds - Other options are stored in config.options
- Returns:
Database instance configured to use the provided Engine
- Return type:
Example
>>> from sqlalchemy import create_engine >>> from moltres import :class:`Database` >>> engine = create_engine("sqlite:///:memory:") >>> db = :class:`Database`.from_engine(engine) >>> # Now use Moltres with your existing engine >>> from moltres.table.schema import column >>> _ = db.create_table("users", [column("id", "INTEGER")]).collect()
- classmethod from_session(session: Session, **options: object) Database[source]
Create a
Databaseinstance from a SQLAlchemy ORM Session.This allows you to use Moltres with an existing SQLAlchemy ORM Session, enabling integration with ORM-based applications.
Note: The
Databasewill use the Session’s bind/engine, but will not manage the Session’s lifecycle. The user is responsible for managing the session.- Parameters:
session – SQLAlchemy ORM Session instance
**options – Optional configuration parameters (same as from_engine)
- Returns:
Databaseinstance configured to use the Session’s bind/engine
Example
>>> from sqlalchemy import create_engine >>> from sqlalchemy.orm import sessionmaker >>> from moltres import :class:`Database` >>> engine = create_engine("sqlite:///:memory:") >>> Session = sessionmaker(bind=engine) >>> with Session() as session: ... db = :class:`Database`.from_session(session) ... # Use Moltres with your existing session ... from moltres.table.schema import column ... _ = db.create_table("users", [column("id", "INTEGER")]).collect()
- get_columns(table_name: str) List['ColumnInfo'][source]
Get column information for a table.
- Parameters:
table_name – Name of the table to inspect
- Returns:
List of ColumnInfo objects with column metadata
- Raises:
ValidationError – If table name is invalid
ValueError – If database connection is not available
RuntimeError – If table does not exist or cannot be inspected
Example
>>> columns = db.get_columns("users") >>> # Returns: [ColumnInfo(name='id', type_name='INTEGER', ...), ...]
- get_table_names(schema: str | None = None) List[str][source]
Get list of table names in the database.
- Parameters:
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
- Returns:
List of table names
- Raises:
ValueError – If database connection is not available
RuntimeError – If inspection fails
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER")]).collect() >>> db.create_table("orders", [column("id", "INTEGER")]).collect() >>> # Get all table names >>> tables = db.get_table_names() >>> "users" in tables True >>> "orders" in tables True >>> db.close()
- get_transaction_status() dict[str, object] | None[source]
Get transaction status and metadata if a transaction is active.
- Returns:
readonly: Whether the transaction is read-only
isolation_level: Transaction isolation level (if set)
timeout: Transaction timeout in seconds (if set)
savepoints: List of active savepoint names
None if no transaction is active
- Return type:
Dictionary with transaction metadata including
Example
>>> with db.transaction(isolation_level="SERIALIZABLE", readonly=True) as txn: ... status = db.get_transaction_status() ... assert status["isolation_level"] == "SERIALIZABLE" ... assert status["readonly"] is True
- get_view_names(schema: str | None = None) List[str][source]
Get list of view names in the database.
- Parameters:
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
- Returns:
List of view names
- Raises:
ValueError – If database connection is not available
RuntimeError – If inspection fails
Example
>>> views = db.get_view_names() >>> # Returns: ['active_users_view', 'order_summary_view']
- insert(table_name: str, rows: Sequence[Mapping[str, object]] | 'Records' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame') int[source]
Insert rows into a table.
Delegates to
TableManager.- Parameters:
table_name – Name of the table to insert into
rows – Sequence of row dictionaries, Records, pandas DataFrame, polars DataFrame, or polars LazyFrame
- Returns:
Number of rows inserted
- Raises:
ValidationError – If table name is invalid or rows are empty
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> # Insert rows >>> count = db.insert("users", [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]) >>> count 2 >>> # Verify insertion >>> df = db.table("users").select() >>> results = df.collect() >>> len(results) 2 >>> results[0]["name"] 'Alice' >>> db.close()
- is_in_transaction() bool[source]
Check if currently in a transaction.
- Returns:
True if a transaction is active, False otherwise
Example
>>> if db.is_in_transaction(): ... status = db.get_transaction_status() ... print(f"Isolation: {status['isolation_level']}")
- property load: DataLoader
Return a DataLoader for loading data from files and tables as DataFrames.
Note: For SQL operations on tables, use db.table(name).select() instead.
- merge(table_name: str, rows: Sequence[Mapping[str, object]] | 'Records' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', *, on: Sequence[str], when_matched: Mapping[str, object] | None = None, when_not_matched: Mapping[str, object] | None = None) int[source]
Merge (upsert) rows into a table.
Convenience method for merging data into a table with conflict resolution.
- Parameters:
table_name – Name of the table to merge into
rows – Sequence of row dictionaries,
Records, pandasDataFrame, polarsDataFrame, or polars LazyFrameon – Sequence of column names that form the conflict key
when_matched – Optional dictionary of column updates when a conflict occurs
when_not_matched – Optional dictionary of default values when inserting new rows
- Returns:
Number of rows affected (inserted or updated)
- Raises:
ValidationError – If table name is invalid, rows are empty, or on columns are invalid
Example
>>> db.merge( ... "users", ... [{"id": 1, "name": "Alice", "email": "alice@example.com"}], ... on=["id"], ... when_matched={"name": "Alice Updated"} ... )
- property read: ReadAccessor
Return a ReadAccessor for accessing read operations.
Use db.read.records.* for
Records-based reads (backward compatibility). Use db.load.* forDataFrame-based reads (PySpark-style).
- reflect(schema: str | None = None, views: bool = False) Dict[str, 'TableSchema'][source]
Reflect entire database schema.
- Parameters:
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
views – If True, also reflect views (default: False)
- Returns:
Dictionary mapping table/view names to TableSchema objects
- Raises:
ValueError – If database connection is not available
RuntimeError – If reflection fails
Example
>>> schemas = db.reflect() >>> # Returns: {'users': TableSchema(...), 'orders': TableSchema(...)}
- reflect_table(name: str, schema: str | None = None) TableSchema[source]
Reflect a single table from the database.
- Parameters:
name – Name of the table to reflect
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
- Returns:
TableSchema object with table metadata
- Raises:
ValidationError – If table name is invalid
ValueError – If database connection is not available
RuntimeError – If table does not exist or reflection fails
Example
>>> schema = db.reflect_table("users") >>> # Returns: TableSchema(name='users', columns=[ColumnDef(...), ...])
- scan_csv(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a CSV file as a
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the CSV file
schema – Optional explicit schema
**options – Format-specific options (e.g., header=True, delimiter=”,”)
- Returns:
PolarsDataFramecontaining the CSV data (lazy)
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_csv("data.csv", header=True) >>> results = df.collect()
- scan_json(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a JSON file (array of objects) as a
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the JSON file
schema – Optional explicit schema
**options – Format-specific options (e.g., multiline=True)
- Returns:
PolarsDataFramecontaining the JSON data (lazy)
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_json("data.json") >>> results = df.collect()
- scan_jsonl(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a JSONL file (one JSON object per line) as a
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the JSONL file
schema – Optional explicit schema
**options – Format-specific options
- Returns:
PolarsDataFramecontaining the JSONL data (lazy)
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_jsonl("data.jsonl") >>> results = df.collect()
- scan_parquet(path: str, schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a Parquet file as a
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the Parquet file
schema – Optional explicit schema
**options – Format-specific options
- Returns:
PolarsDataFramecontaining the Parquet data (lazy)- Raises:
RuntimeError – If pandas or pyarrow are not installed
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_parquet("data.parquet") >>> results = df.collect()
- scan_text(path: str, column_name: str = 'value', schema: Sequence[ColumnDef] | None = None, **options: object) PolarsDataFrame[source]
Scan a text file as a single column
PolarsDataFrame(Polars-style).- Parameters:
path – Path to the text file
column_name – Name of the column to create (default: “value”)
schema – Optional explicit schema
**options – Format-specific options
- Returns:
PolarsDataFramecontaining the text file lines (lazy)
Example
>>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_text("data.txt", column_name="line") >>> results = df.collect()
- schema(table_name: str) List[ColumnDef][source]
Get the schema (column definitions) for a table.
- Parameters:
table_name – Name of the table
- Returns:
List of
ColumnDefobjects describing the table’s columns- Raises:
ValueError – If table does not exist
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> schema = db.schema("users") >>> len(schema) 2 >>> schema[0].name 'id' >>> schema[0].type_name 'INTEGER'
- show_schema(table_name: str) None[source]
Print a formatted schema for a table.
Convenience method for interactive exploration.
- Parameters:
table_name – Name of the table
Example
>>> db.show_schema("users") Schema for table 'users': - id: INTEGER (primary_key=True) - name: TEXT - email: TEXT
- show_tables(schema: str | None = None) None[source]
Print a formatted list of tables in the database.
Convenience method for interactive exploration.
- Parameters:
schema – Optional schema name
Example
>>> db.show_tables() Tables in database: - users - orders - products
- sql(sql: str, **params: object) DataFrame[source]
Execute a SQL query and return a
DataFrame.Similar to PySpark’s spark.sql(), this method accepts a raw SQL string and returns a lazy
DataFramethat can be chained with further operations. The SQL dialect is determined by the database connection.- Parameters:
sql – SQL query string to execute
**params – Optional named parameters for parameterized queries. Use :param_name syntax in SQL and pass values as kwargs.
- Returns:
Lazy
DataFramethat can be chained with further operations
Example
>>> from moltres import connect, col >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT"), column("age", "INTEGER")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice", "age": 25}, {"id": 2, "name": "Bob", "age": 17}], _database=db).insert_into("users") >>> # Basic SQL query >>> df = db.sql("SELECT * FROM users WHERE age > 18") >>> results = df.collect() >>> len(results) 1 >>> results[0]["name"] 'Alice' >>> # Parameterized query >>> df2 = db.sql("SELECT * FROM users WHERE id = :id", id=1) >>> results2 = df2.collect() >>> results2[0]["name"] 'Alice' >>> # Chaining operations >>> db.create_table("orders", [column("id", "INTEGER"), column("amount", "REAL")]).collect() >>> _ = :class:`Records`(_data=[{"id": 1, "amount": 150.0}, {"id": 2, "amount": 50.0}], _database=db).insert_into("orders") >>> df3 = db.sql("SELECT * FROM orders").where(col("amount") > 100).limit(1) >>> results3 = df3.collect() >>> len(results3) 1 >>> results3[0]["amount"] 150.0 >>> db.close()
- table(name: str) TableHandle[source]
- table(model_class: Type['DeclarativeBase']) TableHandle
Get a handle to a table in the database.
Delegates to
TableManager.- Parameters:
name_or_model – Name of the table, SQLAlchemy model class, or SQLModel model class
- Returns:
TableHandle for the specified table
- Raises:
ValidationError – If table name is invalid
ValueError – If model_class is not a valid SQLAlchemy or SQLModel model
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users") >>> # Get table handle by name >>> users = db.table("users") >>> df = users.select("id", "name") >>> results = df.collect() >>> results[0]["name"] 'Alice' >>> db.close()
- tables(schema: str | None = None) Dict[str, List[ColumnDef]][source]
Get all tables in the database with their schemas.
- Parameters:
schema – Optional schema name (for databases that support schemas)
- Returns:
Dictionary mapping table names to their column definitions
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER")]).collect() >>> db.create_table("orders", [column("id", "INTEGER")]).collect() >>> tables = db.tables() >>> "users" in tables True >>> "orders" in tables True >>> len(tables["users"]) 1
- transaction(savepoint: bool = False, readonly: bool = False, isolation_level: str | None = None, timeout: float | None = None) Iterator[Transaction][source]
Create a transaction context for grouping multiple operations.
All operations within the transaction context share the same transaction. If any exception occurs, the transaction is automatically rolled back. Otherwise, it is committed on successful exit.
- Parameters:
savepoint – If True and a transaction is already active, create a savepoint instead of raising an error. This enables nested transactions.
readonly – If True, set the transaction to read-only mode. Prevents writes.
isolation_level – Optional isolation level. Must be one of: - “READ UNCOMMITTED” - “READ COMMITTED” - “REPEATABLE READ” - “SERIALIZABLE”
timeout – Optional transaction timeout in seconds. Database-specific behavior: - PostgreSQL: Sets statement_timeout (milliseconds) - MySQL: Sets innodb_lock_wait_timeout (seconds) - SQLite: Not supported
- Yields:
Transactionobject that can be used for explicit commit/rollback
Example
Basic transaction:
>>> with db.transaction() as txn: ... df.write.insertInto("table") ... df.write.update("table", where=..., set={...}) ... # If any operation fails, all are rolled back ... # Otherwise, all are committed on exit
Nested transaction with savepoint:
>>> with db.transaction() as outer: ... # ... operations ... ... with db.transaction(savepoint=True) as inner: ... # ... operations that can be rolled back independently ... ... inner.savepoint("checkpoint") ... # ... operations ... ... inner.rollback_to_savepoint("checkpoint") ... # outer transaction continues...
Read-only transaction:
>>> with db.transaction(readonly=True) as txn: ... results = db.table("users").select().collect()
Transaction with isolation level:
>>> with db.transaction(isolation_level="SERIALIZABLE") as txn: ... # ... critical operations requiring highest isolation ...
- update(table_name: str, *, where: Column, set: Mapping[str, object]) int[source]
Update rows in a table.
Convenience method for updating data in a table.
- Parameters:
table_name – Name of the table to update
where –
Columnexpression for the WHERE clauseset – Dictionary of column names to new values
- Returns:
Number of rows updated
- Raises:
ValidationError – If table name is invalid or set dictionary is empty
Example
>>> from moltres import connect, col >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users") >>> # Update rows >>> count = db.update("users", where=col("id") == 1, set={"name": "Alice Updated"}) >>> count 1 >>> # Verify update >>> df = db.table("users").select() >>> results = df.collect() >>> results[0]["name"] 'Alice Updated' >>> db.close()
- class moltres.table.table.TableHandle(name: str, database: Database, model: Type[Any] | None = None)[source]
Bases:
objectLightweight handle representing a table reference.
A
TableHandleprovides access to a specific table in the database. It can be created from a table name or from a SQLModel/Pydantic model class.- model
Optional SQLModel, Pydantic, or SQLAlchemy model class
- Type:
Type[Any] | None
Example
>>> db = connect("sqlite:///:memory:") >>> handle = db.table("users") >>> df = handle.select()
- columns() List[str][source]
Get the list of column names for this table.
- Returns:
List of column names
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> handle = db.table("users") >>> cols = handle.columns() >>> "id" in cols True >>> "name" in cols True
- property model_class: Type['DeclarativeBase'] | None
Get the SQLAlchemy model class if this handle was created from a model.
- Returns:
SQLAlchemy model class or None if handle was created from table name
- pandas(*columns: str) PandasDataFrame[source]
Create a
PandasDataFramefrom this table.- Parameters:
*columns – Optional column names to select
- Returns:
PandasDataFramefor pandas-style operations- Return type:
PandasDataFrame
Example
>>> df = db.table('users').pandas() >>> df = db.table('users').pandas('id', 'name')
- polars(*columns: str) PolarsDataFrame[source]
Create a
PolarsDataFramefrom this table.- Parameters:
*columns – Optional column names to select
- Returns:
PolarsDataFramefor Polars-style operations- Return type:
PolarsDataFrame
Example
>>> df = db.table('users').polars() >>> df = db.table('users').polars('id', 'name')
- select(*columns: str) DataFrame[source]
Select columns from this table.
- Parameters:
*columns – Optional column names to select. If empty, selects all columns.
- Returns:
DataFrame with selected columns
- Return type:
DataFrame
Example
>>> db = connect("sqlite:///:memory:") >>> handle = db.table("users") >>> df = handle.select("id", "name")
- class moltres.table.table.Transaction(database: Database, connection: sqlalchemy.engine.Connection, readonly: bool = False, isolation_level: str | None = None, is_savepoint: bool = False)[source]
Bases:
objectTransactioncontext for grouping multiple operations.- commit() None[source]
Explicitly commit the transaction.
- Raises:
RuntimeError – If the transaction has already been committed or rolled back
- is_active() bool[source]
Check if this transaction is still active.
- Returns:
True if the transaction is active (not committed or rolled back), False otherwise
- is_readonly() bool[source]
Check if this transaction is read-only.
- Returns:
True if the transaction is read-only, False otherwise
- isolation_level() str | None[source]
Get the transaction isolation level.
- Returns:
Isolation level string or None if not set
- release_savepoint(name: str) None[source]
Release a savepoint.
- Parameters:
name – Savepoint name to release
- Raises:
RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist
- rollback() None[source]
Explicitly rollback the transaction.
- Raises:
RuntimeError – If the transaction has already been committed or rolled back
- rollback_to_savepoint(name: str) None[source]
Rollback to a specific savepoint.
- Parameters:
name – Savepoint name to rollback to
- Raises:
RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist
- savepoint(name: str | None = None) str[source]
Create a savepoint within this transaction.
- Parameters:
name – Optional savepoint name. If not provided, a unique name is generated.
- Returns:
The savepoint name (generated or provided)
- Raises:
RuntimeError – If the transaction has already been committed or rolled back
ValueError – If savepoints are not supported by the dialect
AsyncTableHandle
Async table access primitives.
- class moltres.table.async_table.AsyncDatabase(config: MoltresConfig)[source]
Bases:
objectEntry-point object returned by
moltres.async_connect.The
AsyncDatabaseclass supports async context manager protocol for automatic connection cleanup. Use it in anasync withstatement to ensure the connection is properly closed.Example
Using async context manager (recommended):
>>> async with async_connect("sqlite+aiosqlite:///:memory:") as db: ... df = db.sql("SELECT * FROM users") ... results = await df.collect() ... # await db.close() called automatically on exit
- property connection_manager: AsyncConnectionManager
- async createDataFrame(data: Sequence[dict[str, Any]] | Sequence[tuple] | 'AsyncRecords' | 'AsyncLazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) AsyncDataFrame[source]
Create an AsyncDataFrame from Python data (list of dicts, list of tuples,
AsyncRecords, AsyncLazyRecords, pandasDataFrame, polarsDataFrame, or polars LazyFrame).Creates a temporary table, inserts the data, and returns an AsyncDataFrame querying from that table. If AsyncLazyRecords is provided, it will be auto-materialized. If pandas/polars
DataFrameor LazyFrame is provided, it will be converted toRecordswith lazy conversion.- Parameters:
data – Input data in one of supported formats: - List of dicts: [{“col1”: val1, “col2”: val2}, …] - List of tuples: Requires schema parameter with column names -
AsyncRecordsobject: Extracts data and schema if available - AsyncLazyRecords object: Auto-materializes and extracts data and schema - pandasDataFrame: Converts toRecordswith schema preservation - polarsDataFrame: Converts toRecordswith schema preservation - polars LazyFrame: Materializes and converts toRecordswith schema preservationschema – Optional explicit schema. If not provided, schema is inferred from data.
pk – Optional column name(s) to mark as primary key. Can be a single string or sequence of strings for composite keys.
auto_pk – Optional column name(s) to create as auto-incrementing primary key. Can specify same name as pk to make an existing column auto-incrementing.
- Returns:
AsyncDataFrame querying from the created temporary table
- Raises:
ValueError – If data is empty and no schema provided, or if primary key requirements are not met
ValidationError – If list of tuples provided without schema, or other validation errors
Example
>>> # Create AsyncDataFrame from list of dicts >>> df = await db.createDataFrame([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], pk="id") >>> # Create AsyncDataFrame with auto-incrementing primary key >>> df = await db.createDataFrame([{"name": "Alice"}, {"name": "Bob"}], auto_pk="id") >>> # Create AsyncDataFrame from AsyncLazyRecords (auto-materializes) >>> lazy_records = db.read.records.csv("data.csv") >>> df = await db.createDataFrame(lazy_records, pk="id")
- async create_dataframe(data: Sequence[dict[str, object]] | Sequence[tuple] | 'AsyncRecords' | 'AsyncLazyRecords' | 'pd.DataFrame' | 'pl.DataFrame' | 'pl.LazyFrame', schema: Sequence[ColumnDef] | None = None, pk: str | Sequence[str] | None = None, auto_pk: str | Sequence[str] | None = None) AsyncDataFrame[source]
Create an AsyncDataFrame from Python data (snake_case alias for createDataFrame).
This is an alias for
createDataFrame(). SeecreateDataFrame()for full documentation.- Parameters:
data – Input data in one of supported formats
schema – Optional explicit schema
pk – Optional column name(s) to mark as primary key
auto_pk – Optional column name(s) to create as auto-incrementing primary key
- Returns:
AsyncDataFrame querying from the created temporary table
- create_index(name: str, table: str, columns: str | Sequence[str], *, unique: bool = False, if_not_exists: bool = True) AsyncCreateIndexOperation[source]
Create a lazy async create index operation.
- Parameters:
name – Name of the index to create
table – Name of the table to create the index on
columns –
Columnname(s) to index (single string or sequence)unique – If True, create a UNIQUE index (default: False)
if_not_exists – If True, don’t error if index already exists (default: True)
- Returns:
AsyncCreateIndexOperation that executes on collect()
Example
>>> op = db.create_index("idx_email", "users", "email") >>> await op.collect() # Executes the CREATE INDEX >>> # Multi-column index >>> op2 = db.create_index("idx_name_age", "users", ["name", "age"], unique=True)
- create_table(name: str, columns: Sequence[ColumnDef], *, if_not_exists: bool = True, temporary: bool = False, constraints: Sequence['UniqueConstraint' | 'CheckConstraint' | 'ForeignKeyConstraint'] | None = None) AsyncCreateTableOperation[source]
- create_table(model_class: Type['DeclarativeBase'], *, if_not_exists: bool = True, temporary: bool = False) AsyncCreateTableOperation
Create a lazy async create table operation.
- Parameters:
name_or_model – Name of the table to create, or SQLAlchemy model class
columns – Sequence of ColumnDef objects defining the table schema (required if name_or_model is str)
if_not_exists – If True, don’t error if table already exists (default: True)
temporary – If True, create a temporary table (default: False)
constraints – Optional sequence of constraint objects (UniqueConstraint, CheckConstraint, ForeignKeyConstraint). Ignored if model_class is provided (constraints are extracted from model).
- Returns:
AsyncCreateTableOperation that executes on collect()
Example
>>> import asyncio >>> from moltres import async_connect >>> from moltres.table.schema import column, unique, check >>> async def example(): ... db = await async_connect("sqlite+aiosqlite:///:memory:") ... # Create table with schema ... op = db.create_table( ... "users", ... [column("id", "INTEGER", primary_key=True), column("email", "TEXT")], ... constraints=[unique("email"), check("id > 0", name="ck_positive_id")] ... ) ... table = await op.collect() # Executes the CREATE TABLE ... # Verify table was created ... tables = await db.get_table_names() ... "users" in tables ... True ... await db.close() ... # asyncio.run(example())
- Raises:
ValidationError – If table name or columns are invalid
ValueError – If model_class is not a valid SQLAlchemy model
Example
>>> op = db.create_table("users", [column("id", "INTEGER")]) >>> table = await op.collect() # Executes the CREATE TABLE
>>> # Or with SQLAlchemy model >>> from sqlalchemy.orm import DeclarativeBase >>> class User(Base): ... __tablename__ = "users" ... id = :class:`Column`(Integer, primary_key=True) >>> op = db.create_table(User) >>> table = await op.collect()
- property dialect: DialectSpec
- drop_index(name: str, table: str | None = None, *, if_exists: bool = True) AsyncDropIndexOperation[source]
Create a lazy async drop index operation.
- Parameters:
name – Name of the index to drop
table – Optional table name (required for some dialects like MySQL)
if_exists – If True, don’t error if index doesn’t exist (default: True)
- Returns:
AsyncDropIndexOperation that executes on collect()
Example
>>> op = db.drop_index("idx_email", "users") >>> await op.collect() # Executes the DROP INDEX
- drop_table(name: str, *, if_exists: bool = True) AsyncDropTableOperation[source]
Create a lazy async drop table operation.
- Parameters:
name – Name of the table to drop
if_exists – If True, don’t error if table doesn’t exist (default: True)
- Returns:
AsyncDropTableOperation that executes on collect()
Example
>>> op = db.drop_table("users") >>> await op.collect() # Executes the DROP TABLE
- async execute_plan(plan: LogicalPlan, model: Type[Any] | None = None) AsyncQueryResult[source]
Execute a logical plan and return results.
- async execute_plan_stream(plan: LogicalPlan) AsyncIterator[List[Dict[str, object]]][source]
Execute a plan and return an async iterator of row chunks.
- async execute_sql(sql: str, params: Dict[str, Any] | None = None) AsyncQueryResult[source]
Execute raw SQL and return results.
- property executor: AsyncQueryExecutor
- classmethod from_async_connection(connection: sqlalchemy.ext.asyncio.engine.AsyncConnection, **options: object) AsyncDatabase[source]
Create an
AsyncDatabaseinstance from an existing SQLAlchemy AsyncConnection.This allows you to use Moltres with an existing SQLAlchemy AsyncConnection, enabling integration within existing async transactions.
Note: The
AsyncDatabasewill use the AsyncConnection’s engine, but will not manage the AsyncConnection’s lifecycle. The user is responsible for managing the connection.- Parameters:
connection – SQLAlchemy AsyncConnection instance
**options – Optional configuration parameters (same as from_async_engine)
- Returns:
AsyncDatabaseinstance configured to use the AsyncConnection’s engine
Example
>>> from sqlalchemy.ext.asyncio import create_async_engine >>> from moltres import :class:`AsyncDatabase` >>> engine = create_async_engine("sqlite+aiosqlite:///:memory:") >>> async with engine.connect() as conn: ... db = :class:`AsyncDatabase`.from_async_connection(conn) ... # Use Moltres within the connection's transaction ... from moltres.table.schema import column ... await db.create_table("users", [column("id", "INTEGER")]).collect()
- classmethod from_async_engine(engine: AsyncEngine, **options: object) AsyncDatabase[source]
Create an
AsyncDatabaseinstance from an existing SQLAlchemy AsyncEngine.This allows you to use Moltres with an existing SQLAlchemy AsyncEngine, enabling integration with existing async SQLAlchemy projects.
- Parameters:
engine – SQLAlchemy AsyncEngine instance
**options – Optional configuration parameters: - echo: Enable SQLAlchemy echo mode - fetch_format: Result format - “records”, “pandas”, or “polars” - dialect: Override SQL dialect detection - query_timeout: Query execution timeout in seconds - Other options are stored in config.options
- Returns:
AsyncDatabaseinstance configured to use the provided AsyncEngine
Example
>>> from sqlalchemy.ext.asyncio import create_async_engine >>> from moltres import :class:`AsyncDatabase` >>> engine = create_async_engine("sqlite+aiosqlite:///:memory:") >>> db = :class:`AsyncDatabase`.from_async_engine(engine) >>> # Now use Moltres with your existing async engine >>> from moltres.table.schema import column >>> await db.create_table("users", [column("id", "INTEGER")]).collect()
- classmethod from_async_session(session: AsyncSession, **options: object) AsyncDatabase[source]
Create an
AsyncDatabaseinstance from a SQLAlchemy AsyncSession.This allows you to use Moltres with an existing SQLAlchemy AsyncSession, enabling integration with async ORM-based applications.
Note: The
AsyncDatabasewill use the AsyncSession’s bind/engine, but will not manage the AsyncSession’s lifecycle. The user is responsible for managing the session.- Parameters:
session – SQLAlchemy AsyncSession instance
**options – Optional configuration parameters (same as from_async_engine)
- Returns:
AsyncDatabaseinstance configured to use the AsyncSession’s bind/engine
Example
>>> from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker >>> from moltres import :class:`AsyncDatabase` >>> engine = create_async_engine("sqlite+aiosqlite:///:memory:") >>> AsyncSession = async_sessionmaker(bind=engine) >>> async with AsyncSession() as session: ... db = :class:`AsyncDatabase`.from_async_session(session) ... # Use Moltres with your existing async session ... from moltres.table.schema import column ... await db.create_table("users", [column("id", "INTEGER")]).collect()
- async get_columns(table_name: str) List['ColumnInfo'][source]
Get column information for a table.
- Parameters:
table_name – Name of the table to inspect
- Returns:
List of ColumnInfo objects with column metadata
- Raises:
ValidationError – If table name is invalid
ValueError – If database connection is not available
RuntimeError – If table does not exist or cannot be inspected
Example
>>> columns = await db.get_columns("users") >>> # Returns: [ColumnInfo(name='id', type_name='INTEGER', ...), ...]
- async get_table_names(schema: str | None = None) List[str][source]
Get list of table names in the database.
- Parameters:
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
- Returns:
List of table names
- Raises:
ValueError – If database connection is not available
RuntimeError – If inspection fails
Example
>>> import asyncio >>> from moltres import async_connect >>> from moltres.table.schema import column >>> async def example(): ... db = await async_connect("sqlite+aiosqlite:///:memory:") ... await db.create_table("users", [column("id", "INTEGER")]).collect() ... await db.create_table("orders", [column("id", "INTEGER")]).collect() ... # Get all table names ... tables = await db.get_table_names() ... "users" in tables ... True ... "orders" in tables ... True ... await db.close() ... # asyncio.run(example())
- get_transaction_status() dict[str, object] | None[source]
Get transaction status and metadata if a transaction is active.
- Returns:
readonly: Whether the transaction is read-only
isolation_level: Transaction isolation level (if set)
timeout: Transaction timeout in seconds (if set)
savepoints: List of active savepoint names
None if no transaction is active
- Return type:
Dictionary with transaction metadata including
Example
>>> async with db.transaction(isolation_level="SERIALIZABLE", readonly=True) as txn: ... status = db.get_transaction_status() ... assert status["isolation_level"] == "SERIALIZABLE" ... assert status["readonly"] is True
- async get_view_names(schema: str | None = None) List[str][source]
Get list of view names in the database.
- Parameters:
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
- Returns:
List of view names
- Raises:
ValueError – If database connection is not available
RuntimeError – If inspection fails
Example
>>> views = await db.get_view_names() >>> # Returns: ['active_users_view', 'order_summary_view']
- is_in_transaction() bool[source]
Check if currently in a transaction.
- Returns:
True if a transaction is active, False otherwise
Example
>>> if db.is_in_transaction(): ... status = db.get_transaction_status() ... print(f"Isolation: {status['isolation_level']}")
- property load: AsyncDataLoader
Return an AsyncDataLoader for loading data from files and tables as AsyncDataFrames.
Note: For SQL operations on tables, use await db.table(name).select() instead.
- property read: AsyncReadAccessor
Return an AsyncReadAccessor for accessing read operations.
Use await db.read.records.* for
AsyncRecords-based reads (backward compatibility). Use db.load.* for AsyncDataFrame-based reads (PySpark-style).
- async reflect(schema: str | None = None, views: bool = False) Dict[str, 'TableSchema'][source]
Reflect entire database schema.
- Parameters:
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
views – If True, also reflect views (default: False)
- Returns:
Dictionary mapping table/view names to TableSchema objects
- Raises:
ValueError – If database connection is not available
RuntimeError – If reflection fails
Example
>>> schemas = await db.reflect() >>> # Returns: {'users': TableSchema(...), 'orders': TableSchema(...)}
- async reflect_table(name: str, schema: str | None = None) TableSchema[source]
Reflect a single table from the database.
- Parameters:
name – Name of the table to reflect
schema – Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema.
- Returns:
TableSchema object with table metadata
- Raises:
ValidationError – If table name is invalid
ValueError – If database connection is not available
RuntimeError – If table does not exist or reflection fails
Example
>>> schema = await db.reflect_table("users") >>> # Returns: TableSchema(name='users', columns=[ColumnDef(...), ...])
- async scan_csv(path: str, schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]
Scan a CSV file as an AsyncPolarsDataFrame (Polars-style).
- Parameters:
path – Path to the CSV file
schema – Optional explicit schema
**options – Format-specific options (e.g., header=True, delimiter=”,”)
- Returns:
AsyncPolarsDataFrame containing the CSV data (lazy)
Example
>>> from moltres import async_connect >>> db = await async_connect("sqlite+aiosqlite:///:memory:") >>> df = await db.scan_csv("data.csv", header=True) >>> results = await df.collect()
- async scan_json(path: str, schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]
Scan a JSON file (array of objects) as an AsyncPolarsDataFrame (Polars-style).
- Parameters:
path – Path to the JSON file
schema – Optional explicit schema
**options – Format-specific options (e.g., multiline=True)
- Returns:
AsyncPolarsDataFrame containing the JSON data (lazy)
Example
>>> from moltres import async_connect >>> db = await async_connect("sqlite+aiosqlite:///:memory:") >>> df = await db.scan_json("data.json") >>> results = await df.collect()
- async scan_jsonl(path: str, schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]
Scan a JSONL file (one JSON object per line) as an AsyncPolarsDataFrame (Polars-style).
- Parameters:
path – Path to the JSONL file
schema – Optional explicit schema
**options – Format-specific options
- Returns:
AsyncPolarsDataFrame containing the JSONL data (lazy)
Example
>>> from moltres import async_connect >>> db = await async_connect("sqlite+aiosqlite:///:memory:") >>> df = await db.scan_jsonl("data.jsonl") >>> results = await df.collect()
- async scan_parquet(path: str, schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]
Scan a Parquet file as an AsyncPolarsDataFrame (Polars-style).
- Parameters:
path – Path to the Parquet file
schema – Optional explicit schema
**options – Format-specific options
- Returns:
AsyncPolarsDataFrame containing the Parquet data (lazy)
- Raises:
RuntimeError – If pandas or pyarrow are not installed
Example
>>> from moltres import async_connect >>> db = await async_connect("sqlite+aiosqlite:///:memory:") >>> df = await db.scan_parquet("data.parquet") >>> results = await df.collect()
- async scan_text(path: str, column_name: str = 'value', schema: Sequence['ColumnDef'] | None = None, **options: object) AsyncPolarsDataFrame[source]
Scan a text file as a single column AsyncPolarsDataFrame (Polars-style).
- Parameters:
path – Path to the text file
column_name – Name of the column to create (default: “value”)
schema – Optional explicit schema
**options – Format-specific options
- Returns:
AsyncPolarsDataFrame containing the text file lines (lazy)
Example
>>> from moltres import async_connect >>> db = await async_connect("sqlite+aiosqlite:///:memory:") >>> df = await db.scan_text("data.txt", column_name="line") >>> results = await df.collect()
- sql(sql: str, **params: object) AsyncDataFrame[source]
Execute a SQL query and return an AsyncDataFrame.
Similar to PySpark’s spark.sql(), this method accepts a raw SQL string and returns a lazy AsyncDataFrame that can be chained with further operations. The SQL dialect is determined by the database connection.
- Parameters:
sql – SQL query string to execute
**params – Optional named parameters for parameterized queries. Use :param_name syntax in SQL and pass values as kwargs.
- Returns:
Lazy AsyncDataFrame that can be chained with further operations
Example
>>> import asyncio >>> from moltres import async_connect, col >>> from moltres.table.schema import column >>> async def example(): ... db = await async_connect("sqlite+aiosqlite:///:memory:") ... await db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT"), column("age", "INTEGER")]).collect() ... from moltres.io.records import :class:`AsyncRecords` ... records = :class:`AsyncRecords`(_data=[{"id": 1, "name": "Alice", "age": 25}, {"id": 2, "name": "Bob", "age": 17}], _database=db) ... await records.insert_into("users") ... # Basic SQL query ... df = db.sql("SELECT * FROM users WHERE age > 18") ... results = await df.collect() ... len(results) ... 1 ... # Parameterized query ... df2 = db.sql("SELECT * FROM users WHERE id = :id AND name = :name", id=1, name="Alice") ... results2 = await df2.collect() ... results2[0]["name"] ... 'Alice' ... await db.close() ... # asyncio.run(example())
- async table(name: str) AsyncTableHandle[source]
- async table(model_class: Type['DeclarativeBase']) AsyncTableHandle
Get a handle to a table in the database.
- Parameters:
name_or_model – Name of the table, SQLAlchemy model class, or SQLModel model class
- Returns:
AsyncTableHandle for the specified table
- Raises:
ValidationError – If table name is invalid
ValueError – If model_class is not a valid SQLAlchemy or SQLModel model
Example
>>> import asyncio >>> from moltres import async_connect >>> from moltres.table.schema import column >>> async def example(): ... db = await async_connect("sqlite+aiosqlite:///:memory:") ... await db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() ... from moltres.io.records import :class:`AsyncRecords` ... records = :class:`AsyncRecords`(_data=[{"id": 1, "name": "Alice"}], _database=db) ... await records.insert_into("users") ... # Get table handle ... users = await db.table("users") ... df = users.select("id", "name") ... results = await df.collect() ... results[0]["name"] ... 'Alice' ... await db.close() ... # asyncio.run(example())
- transaction(savepoint: bool = False, readonly: bool = False, isolation_level: str | None = None, timeout: float | None = None) AsyncIterator[AsyncTransaction][source]
Create an async transaction context for grouping multiple operations.
All operations within the transaction context share the same transaction. If any exception occurs, the transaction is automatically rolled back. Otherwise, it is committed on successful exit.
- Parameters:
savepoint – If True and a transaction is already active, create a savepoint instead of raising an error. This enables nested transactions.
readonly – If True, set the transaction to read-only mode. Prevents writes.
isolation_level – Optional isolation level. Must be one of: - “READ UNCOMMITTED” - “READ COMMITTED” - “REPEATABLE READ” - “SERIALIZABLE”
timeout – Optional transaction timeout in seconds. Database-specific behavior: - PostgreSQL: Sets statement_timeout (milliseconds) - MySQL: Sets innodb_lock_wait_timeout (seconds) - SQLite: Not supported
- Yields:
AsyncTransaction object that can be used for explicit commit/rollback
Example
Basic transaction:
>>> async with db.transaction() as txn: ... await df.write.insertInto("table") ... await df.write.update("table", where=..., set={...}) ... # If any operation fails, all are rolled back ... # Otherwise, all are committed on exit
Nested transaction with savepoint:
>>> async with db.transaction() as outer: ... # ... operations ... ... async with db.transaction(savepoint=True) as inner: ... # ... operations that can be rolled back independently ... ... await inner.savepoint("checkpoint") ... # ... operations ... ... await inner.rollback_to_savepoint("checkpoint") ... # outer transaction continues...
Read-only transaction:
>>> async with db.transaction(readonly=True) as txn: ... results = await db.table("users").select().collect()
Transaction with isolation level:
>>> async with db.transaction(isolation_level="SERIALIZABLE") as txn: ... # ... critical operations requiring highest isolation ...
- class moltres.table.async_table.AsyncTableHandle(name: str, database: AsyncDatabase, model: Type[Any] | None = None)[source]
Bases:
objectLightweight handle representing a table reference for async operations.
- database: AsyncDatabase
- property model_class: Type['DeclarativeBase'] | None
Get the SQLAlchemy model class if this handle was created from a model.
- Returns:
SQLAlchemy model class or None if handle was created from table name
- pandas() AsyncPandasDataFrame[source]
Create an AsyncPandasDataFrame from this table (Pandas-style entry point).
- Returns:
AsyncPandasDataFrame querying from this table
Example
>>> from moltres import async_connect >>> db = await async_connect("sqlite+aiosqlite:///:memory:") >>> table = await db.table("users") >>> df = table.pandas() >>> results = await df.collect()
- polars() AsyncPolarsDataFrame[source]
Create an AsyncPolarsDataFrame from this table (Polars-style entry point).
- Returns:
AsyncPolarsDataFrame querying from this table
Example
>>> from moltres import async_connect >>> db = await async_connect("sqlite+aiosqlite:///:memory:") >>> table = await db.table("users") >>> df = table.polars() >>> results = await df.collect()
- class moltres.table.async_table.AsyncTransaction(database: AsyncDatabase, connection: sqlalchemy.ext.asyncio.engine.AsyncConnection, readonly: bool = False, isolation_level: str | None = None, is_savepoint: bool = False)[source]
Bases:
objectAsync transaction context for grouping multiple operations.
- is_active() bool[source]
Check if this transaction is still active.
- Returns:
True if the transaction is active (not committed or rolled back), False otherwise
- is_readonly() bool[source]
Check if this transaction is read-only.
- Returns:
True if the transaction is read-only, False otherwise
- isolation_level() str | None[source]
Get the transaction isolation level.
- Returns:
Isolation level string or None if not set
- async release_savepoint(name: str) None[source]
Release a savepoint.
- Parameters:
name – Savepoint name to release
- Raises:
RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist
- async rollback_to_savepoint(name: str) None[source]
Rollback to a specific savepoint.
- Parameters:
name – Savepoint name to rollback to
- Raises:
RuntimeError – If the transaction has already been committed or rolled back, or if the savepoint doesn’t exist
- async savepoint(name: str | None = None) str[source]
Create a savepoint within this transaction.
- Parameters:
name – Optional savepoint name. If not provided, a unique name is generated.
- Returns:
The savepoint name (generated or provided)
- Raises:
RuntimeError – If the transaction has already been committed or rolled back
ValueError – If savepoints are not supported by the dialect
Schema
Schema definition primitives for table creation.
- class moltres.table.schema.CheckConstraint(name: str | None = None, expression: str = '')[source]
Bases:
objectDefinition of a CHECK constraint.
- class moltres.table.schema.ColumnDef(name: str, type_name: str, nullable: bool = True, default: object | None = None, primary_key: bool = False, precision: int | None = None, scale: int | None = None)[source]
Bases:
objectDefinition of a single table column.
- class moltres.table.schema.ForeignKeyConstraint(name: str | None = None, columns: str | Sequence[str] = (), references_table: str = '', references_columns: str | Sequence[str] = (), on_delete: str | None = None, on_update: str | None = None)[source]
Bases:
objectDefinition of a FOREIGN KEY constraint.
- class moltres.table.schema.TableSchema(name: str, columns: Sequence[ColumnDef], if_not_exists: bool = True, temporary: bool = False, constraints: Sequence[UniqueConstraint | CheckConstraint | ForeignKeyConstraint] = ())[source]
Bases:
objectComplete schema definition for a table.
- constraints: Sequence[UniqueConstraint | CheckConstraint | ForeignKeyConstraint] = ()
- class moltres.table.schema.UniqueConstraint(name: str | None = None, columns: str | Sequence[str] = ())[source]
Bases:
objectDefinition of a UNIQUE constraint.
- moltres.table.schema.check(expression: str, name: str | None = None) CheckConstraint[source]
Convenience helper for creating CHECK constraints.
- Parameters:
expression – SQL expression for the check constraint (e.g., “age > 0”)
name – Optional constraint name
- Returns:
CheckConstraint object
Example
>>> from moltres.table.schema import check >>> ck = check("age >= 0 AND age <= 150", name="ck_valid_age")
- moltres.table.schema.column(name: str, type_name: str, nullable: bool = True, default: object | None = None, primary_key: bool = False, precision: int | None = None, scale: int | None = None) ColumnDef[source]
Convenience helper for creating column definitions.
- Parameters:
name – Column name
type_name – SQL type name (e.g., “INTEGER”, “TEXT”, “REAL”, “DECIMAL”)
nullable – Whether the column allows NULL values (default: True)
default – Default value for the column (default: None)
primary_key – Whether this column is a primary key (default: False)
precision – Precision for DECIMAL/NUMERIC types (default: None)
scale – Scale for DECIMAL/NUMERIC types (default: None)
- Returns:
ColumnDef object for use in table creation
- Return type:
Example
>>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> # Create table with column definitions >>> _ = db.create_table( ... "users", ... [ ... column("id", "INTEGER", primary_key=True), ... column("name", "TEXT", nullable=False), ... column("age", "INTEGER"), ... column("balance", "DECIMAL", precision=10, scale=2) ... ] ... ).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice", "age": 30, "balance": 100.50}], _database=db).insert_into("users") >>> df = db.table("users").select() >>> results = df.collect() >>> results[0]["name"] 'Alice' >>> results[0]["age"] 30 >>> db.close()
- moltres.table.schema.decimal(name: str, precision: int, scale: int = 0, nullable: bool = True, default: object | None = None, primary_key: bool = False) ColumnDef[source]
Convenience helper for creating DECIMAL/NUMERIC column definitions.
- Parameters:
name –
Columnnameprecision – Total number of digits
scale – Number of digits after the decimal point
nullable – Whether the column can be NULL
default – Default value for the column
primary_key – Whether this column is a primary key
- Returns:
ColumnDef with type_name=”DECIMAL” and precision/scale set
Example
>>> from moltres.table.schema import decimal >>> col = decimal("price", precision=10, scale=2) # DECIMAL(10, 2)
- moltres.table.schema.foreign_key(columns: str | Sequence[str], references_table: str, references_columns: str | Sequence[str], name: str | None = None, on_delete: str | None = None, on_update: str | None = None) ForeignKeyConstraint[source]
Convenience helper for creating FOREIGN KEY constraints.
- Parameters:
columns –
Columnname(s) in this tablereferences_table – Name of the referenced table
references_columns –
Columnname(s) in the referenced tablename – Optional constraint name
on_delete – Optional action on delete (e.g., “CASCADE”, “SET NULL”, “RESTRICT”)
on_update – Optional action on update (e.g., “CASCADE”, “SET NULL”, “RESTRICT”)
- Returns:
ForeignKeyConstraint object
Example
>>> from moltres.table.schema import foreign_key >>> # Single column foreign key >>> fk1 = foreign_key("user_id", "users", "id", on_delete="CASCADE") >>> # Multi-column foreign key >>> fk2 = foreign_key(["order_id", "item_id"], "order_items", ["id", "id"])
- moltres.table.schema.json(name: str, nullable: bool = True, default: object | None = None, jsonb: bool = False) ColumnDef[source]
Convenience helper for creating JSON/JSONB column definitions.
- Parameters:
name –
Columnnamenullable – Whether the column can be NULL
default – Default value for the column
jsonb – If True, use JSONB (PostgreSQL only), otherwise use JSON
- Returns:
ColumnDef with type_name=”JSONB” (PostgreSQL with jsonb=True), “JSON” (MySQL/PostgreSQL), or “TEXT” (SQLite)
Example
>>> from moltres.table.schema import json >>> col = json("data") # JSON type >>> col2 = json("metadata", jsonb=True) # JSONB type (PostgreSQL)
- moltres.table.schema.unique(columns: str | Sequence[str], name: str | None = None) UniqueConstraint[source]
Convenience helper for creating UNIQUE constraints.
- Parameters:
columns –
Columnname(s) for the unique constraintname – Optional constraint name
- Returns:
UniqueConstraint object
Example
>>> from moltres.table.schema import unique >>> # Single column unique constraint >>> uq1 = unique("email") >>> # Multi-column unique constraint >>> uq2 = unique(["user_id", "session_id"], name="uq_user_session")
- moltres.table.schema.uuid(name: str, nullable: bool = True, default: object | None = None, primary_key: bool = False) ColumnDef[source]
Convenience helper for creating UUID column definitions.
- Parameters:
name –
Columnnamenullable – Whether the column can be NULL
default – Default value for the column
primary_key – Whether this column is a primary key
- Returns:
ColumnDef with type_name=”UUID” (PostgreSQL) or “CHAR(36)” (MySQL) or “TEXT” (SQLite)
Example
>>> from moltres.table.schema import uuid >>> col = uuid("id", primary_key=True) # UUID type