Source code for moltres.table.table

"""Table access primitives.

This module provides the core database and table access functionality:

- :class:`Database` - Main database connection and query interface
- :class:`TableHandle` - Lightweight reference to a database table
- :class:`Transaction` - Transaction context manager

The :class:`Database` class is the primary entry point for all database operations,
including table creation, querying, and data mutations.
"""

from __future__ import annotations

import atexit
from contextlib import contextmanager
import logging
import signal
from types import FrameType, TracebackType
import weakref
from dataclasses import dataclass
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterator,
    List,
    Mapping,
    Optional,
    Union,
    cast,
    overload,
    Sequence,
    Type,
)

from ..config import MoltresConfig

if TYPE_CHECKING:
    import pandas as pd
    import polars as pl
    from sqlalchemy.engine import Engine
    from sqlalchemy.orm import DeclarativeBase, Session
    from sqlalchemy.sql import Select
    from ..dataframe.core.dataframe import DataFrame
    from ..dataframe.interfaces.pandas_dataframe import PandasDataFrame
    from ..dataframe.interfaces.polars_dataframe import PolarsDataFrame
    from ..dataframe.io.reader import DataLoader, ReadAccessor
    from ..expressions.column import Column
    from ..io.records import LazyRecords, Records
    from ..utils.inspector import ColumnInfo
    from .actions import (
        CreateIndexOperation,
        CreateTableOperation,
        DropIndexOperation,
        DropTableOperation,
    )
    from .batch import OperationBatch
    from .schema import (
        CheckConstraint,
        ForeignKeyConstraint,
        TableSchema,
        UniqueConstraint,
    )

    # Type alias for table name or model
    TableNameOrModel = Union[str, Type[DeclarativeBase], Type[Any]]
from sqlalchemy.engine import Connection

from ..engine.connection import ConnectionManager
from ..engine.dialects import DialectSpec, get_dialect
from ..engine.execution import QueryExecutor, QueryResult
from ..logical.plan import LogicalPlan
from ..sql.compiler import compile_plan
from .schema import ColumnDef

logger = logging.getLogger(__name__)
_ACTIVE_DATABASES: "weakref.WeakSet[Database]" = weakref.WeakSet()


[docs] @dataclass class TableHandle: """Lightweight handle representing a table reference. A :class:`TableHandle` provides access to a specific table in the database. It can be created from a table name or from a SQLModel/Pydantic model class. Attributes: name: Name of the table database: The :class:`Database` instance this handle belongs to model: Optional SQLModel, Pydantic, or SQLAlchemy model class Example: >>> db = connect("sqlite:///:memory:") >>> handle = db.table("users") >>> df = handle.select() """ name: str database: "Database" model: Optional[Type[Any]] = None # Can be SQLModel, Pydantic, or SQLAlchemy model def __repr__(self) -> str: """Return a user-friendly string representation of the TableHandle.""" if self.model: model_name = getattr(self.model, "__name__", str(self.model)) return f"TableHandle('{self.name}', model={model_name})" return f"TableHandle('{self.name}')" @property def model_class(self) -> Optional[Type["DeclarativeBase"]]: """Get the SQLAlchemy model class if this handle was created from a model. Returns: SQLAlchemy model class or None if handle was created from table name """ return self.model
[docs] def select(self, *columns: str) -> "DataFrame": """Select columns from this table. Args: *columns: Optional column names to select. If empty, selects all columns. Returns: :class:`DataFrame`: DataFrame with selected columns Example: >>> db = connect("sqlite:///:memory:") >>> handle = db.table("users") >>> df = handle.select("id", "name") """ from ..dataframe.core.dataframe import DataFrame return DataFrame.from_table(self, columns=list(columns))
[docs] def columns(self) -> List[str]: """Get the list of column names for this table. Returns: List of column names Example: >>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> handle = db.table("users") >>> cols = handle.columns() >>> "id" in cols True >>> "name" in cols True """ column_infos = self.database.get_columns(self.name) return [col.name for col in column_infos]
[docs] def pandas(self, *columns: str) -> "PandasDataFrame": """Create a :class:`PandasDataFrame` from this table. Args: *columns: Optional column names to select Returns: :class:`PandasDataFrame`: :class:`PandasDataFrame` for pandas-style operations Example: >>> df = db.table('users').pandas() >>> df = db.table('users').pandas('id', 'name') """ from ..dataframe.interfaces.pandas_dataframe import PandasDataFrame from ..dataframe.core.dataframe import DataFrame df = DataFrame.from_table(self, columns=list(columns) if columns else None) return PandasDataFrame.from_dataframe(df)
[docs] def polars(self, *columns: str) -> "PolarsDataFrame": """Create a :class:`PolarsDataFrame` from this table. Args: *columns: Optional column names to select Returns: :class:`PolarsDataFrame`: :class:`PolarsDataFrame` for Polars-style operations Example: >>> df = db.table('users').polars() >>> df = db.table('users').polars('id', 'name') """ from ..dataframe.interfaces.polars_dataframe import PolarsDataFrame from ..dataframe.core.dataframe import DataFrame df = DataFrame.from_table(self, columns=list(columns) if columns else None) return PolarsDataFrame.from_dataframe(df)
[docs] class Transaction: """:class:`Transaction` context for grouping multiple operations.""" def __init__( self, database: "Database", connection: Connection, readonly: bool = False, isolation_level: Optional[str] = None, is_savepoint: bool = False, ): """Initialize a transaction context. Args: database: The database instance this transaction belongs to connection: The SQLAlchemy connection for this transaction readonly: Whether this transaction is read-only isolation_level: Transaction isolation level is_savepoint: Whether this transaction is actually a savepoint """ self.database = database self.connection = connection self._committed = False self._rolled_back = False self._readonly = readonly self._isolation_level = isolation_level self._is_savepoint = is_savepoint
[docs] def commit(self) -> None: """Explicitly commit the transaction. Raises: RuntimeError: If the transaction has already been committed or rolled back """ if self._committed or self._rolled_back: raise RuntimeError("Transaction already committed or rolled back") self.database.connection_manager.commit_transaction(self.connection) self._committed = True # Execute commit hooks from ..utils.transaction_hooks import _execute_hooks, _on_commit_hooks _execute_hooks(_on_commit_hooks, self)
[docs] def rollback(self) -> None: """Explicitly rollback the transaction. Raises: RuntimeError: If the transaction has already been committed or rolled back """ if self._committed or self._rolled_back: raise RuntimeError("Transaction already committed or rolled back") self.database.connection_manager.rollback_transaction(self.connection) self._rolled_back = True # Execute rollback hooks from ..utils.transaction_hooks import _execute_hooks, _on_rollback_hooks _execute_hooks(_on_rollback_hooks, self)
[docs] def savepoint(self, name: Optional[str] = None) -> str: """Create a savepoint within this transaction. Args: name: Optional savepoint name. If not provided, a unique name is generated. Returns: The savepoint name (generated or provided) Raises: RuntimeError: If the transaction has already been committed or rolled back ValueError: If savepoints are not supported by the dialect """ if self._committed or self._rolled_back: raise RuntimeError("Transaction already committed or rolled back") if name is None: name = self.database.connection_manager._generate_savepoint_name() self.database.connection_manager.create_savepoint(self.connection, name) return name
[docs] def rollback_to_savepoint(self, name: str) -> None: """Rollback to a specific savepoint. Args: name: Savepoint name to rollback to Raises: RuntimeError: If the transaction has already been committed or rolled back, or if the savepoint doesn't exist """ if self._committed or self._rolled_back: raise RuntimeError("Transaction already committed or rolled back") self.database.connection_manager.rollback_to_savepoint(self.connection, name)
[docs] def release_savepoint(self, name: str) -> None: """Release a savepoint. Args: name: Savepoint name to release Raises: RuntimeError: If the transaction has already been committed or rolled back, or if the savepoint doesn't exist """ if self._committed or self._rolled_back: raise RuntimeError("Transaction already committed or rolled back") self.database.connection_manager.release_savepoint(self.connection, name)
[docs] def is_readonly(self) -> bool: """Check if this transaction is read-only. Returns: True if the transaction is read-only, False otherwise """ return self._readonly
[docs] def isolation_level(self) -> Optional[str]: """Get the transaction isolation level. Returns: Isolation level string or None if not set """ return self._isolation_level
[docs] def is_active(self) -> bool: """Check if this transaction is still active. Returns: True if the transaction is active (not committed or rolled back), False otherwise """ return not self._committed and not self._rolled_back
def __enter__(self) -> "Transaction": """Enter the transaction context. Returns: :class:`Transaction`: This transaction instance """ # Execute begin hooks from ..utils.transaction_hooks import _execute_hooks, _on_begin_hooks _execute_hooks(_on_begin_hooks, self) # Start metrics tracking import time self._metrics_start_time = time.time() self._metrics_has_savepoint = self._is_savepoint self._metrics_readonly = self._readonly self._metrics_isolation_level = self._isolation_level return self def __exit__( self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: """Exit the transaction context. Automatically commits if no exception occurred, or rolls back if an exception was raised. Args: exc_type: Exception type if an exception occurred, None otherwise exc_val: Exception value if an exception occurred, None otherwise exc_tb: Exception traceback if an exception occurred, None otherwise """ if exc_type is not None: # Exception occurred, rollback if not self._rolled_back and not self._committed: self.rollback() else: # No exception, commit if not self._committed and not self._rolled_back: self.commit() # Record metrics after commit/rollback if hasattr(self, "_metrics_start_time"): import time from ..utils.transaction_metrics import get_transaction_metrics duration = time.time() - self._metrics_start_time committed = exc_type is None and not self._rolled_back metrics = get_transaction_metrics() metrics.record_transaction( duration=duration, committed=committed, has_savepoint=self._metrics_has_savepoint, readonly=self._metrics_readonly, isolation_level=self._metrics_isolation_level, error=exc_val if (exc_type and isinstance(exc_val, Exception)) else None, )
[docs] class Database: """Entry-point object returned by :func:`moltres.connect`. The :class:`Database` class provides the main interface for all database operations in Moltres. It handles connections, query execution, table management, and data mutations. The :class:`Database` class supports context manager protocol for automatic connection cleanup. Use it in a ``with`` statement to ensure the connection is properly closed. Attributes: config: The :class:`MoltresConfig` instance used for this database dialect: The SQL dialect being used (e.g., "sqlite", "postgresql") Example: Basic usage:: >>> from moltres import connect, col >>> db = connect("sqlite:///example.db") >>> df = db.table("users").select().where(col("active") == True) >>> results = df.collect() >>> db.close() Using context manager (recommended):: >>> with connect("sqlite:///example.db") as db: ... df = db.table("users").select().where(col("active") == True) ... results = df.collect() ... # db.close() called automatically on exit """ def __init__(self, config: MoltresConfig): self.config = config self._connections = ConnectionManager(config.engine) self._executor = QueryExecutor(self._connections, config.engine) self._dialect = get_dialect(self._dialect_name) self._ephemeral_tables: set[str] = set() self._closed = False _ACTIVE_DATABASES.add(self) def __repr__(self) -> str: """Return a user-friendly string representation of the Database.""" dialect_name = self._dialect_name status = "closed" if self._closed else "open" # Try to get DSN from config, but sanitize it dsn = None if self.config.engine.dsn: dsn = self.config.engine.dsn # Sanitize DSN to hide passwords if "://" in dsn: parts = dsn.split("://", 1) if "@" in parts[1]: # Has credentials - hide password scheme = parts[0] rest = parts[1] if "/" in rest: # Format: user:pass@host/db creds_and_host, db_part = rest.rsplit("/", 1) if "@" in creds_and_host: user_pass, host = creds_and_host.rsplit("@", 1) if ":" in user_pass: user, _ = user_pass.split(":", 1) dsn = f"{scheme}://{user}:***@{host}/{db_part}" else: dsn = f"{scheme}://{user_pass}@{host}/{db_part}" else: # No database part if "@" in rest: user_pass, host = rest.rsplit("@", 1) if ":" in user_pass: user, _ = user_pass.split(":", 1) dsn = f"{scheme}://{user}:***@{host}" else: dsn = f"{scheme}://{user_pass}@{host}" if dsn: return f"Database(dialect='{dialect_name}', dsn='{dsn}', status='{status}')" else: return f"Database(dialect='{dialect_name}', status='{status}')" @property def connection_manager(self) -> ConnectionManager: """Get the connection manager for this database. Returns: ConnectionManager: The connection manager instance """ return self._connections @property def executor(self) -> QueryExecutor: """Get the query executor for this database. Returns: QueryExecutor: The query executor instance """ return self._executor
[docs] @classmethod def from_engine(cls, engine: Engine, **options: object) -> "Database": """Create a :class:`Database` instance from an existing SQLAlchemy Engine. This allows you to use Moltres with an existing SQLAlchemy Engine, enabling integration with existing SQLAlchemy projects. Args: engine: SQLAlchemy Engine instance **options: Optional configuration parameters: - echo: Enable SQLAlchemy echo mode - fetch_format: Result format - "records", "pandas", or "polars" - dialect: Override SQL dialect detection - query_timeout: Query execution timeout in seconds - Other options are stored in config.options Returns: :class:`Database`: Database instance configured to use the provided Engine Example: >>> from sqlalchemy import create_engine >>> from moltres import :class:`Database` >>> engine = create_engine("sqlite:///:memory:") >>> db = :class:`Database`.from_engine(engine) >>> # Now use Moltres with your existing engine >>> from moltres.table.schema import column >>> _ = db.create_table("users", [column("id", "INTEGER")]).collect() """ from ..config import create_config from typing import cast, Any # Type cast needed because mypy doesn't understand **options unpacking config = create_config(engine=engine, **cast(dict[str, Any], options)) return cls(config=config)
[docs] @classmethod def from_connection(cls, connection: Connection, **options: object) -> "Database": """Create a :class:`Database` instance from an existing SQLAlchemy Connection. This allows you to use Moltres with an existing SQLAlchemy Connection, enabling integration within existing transactions. Note: The :class:`Database` will use the Connection's engine, but will not manage the Connection's lifecycle. The user is responsible for managing the connection. Args: connection: SQLAlchemy Connection instance **options: Optional configuration parameters (same as from_engine) Returns: :class:`Database` instance configured to use the Connection's engine Example: >>> from sqlalchemy import create_engine >>> from moltres import :class:`Database` >>> engine = create_engine("sqlite:///:memory:") >>> with engine.connect() as conn: ... db = :class:`Database`.from_connection(conn) ... # Use Moltres within the connection's transaction ... from moltres.table.schema import column ... _ = db.create_table("users", [column("id", "INTEGER")]).collect() """ # Extract engine from connection engine = connection.engine return cls.from_engine(engine, **options)
[docs] @classmethod def from_session(cls, session: "Session", **options: object) -> "Database": """Create a :class:`Database` instance from a SQLAlchemy ORM Session. This allows you to use Moltres with an existing SQLAlchemy ORM Session, enabling integration with ORM-based applications. Note: The :class:`Database` will use the Session's bind/engine, but will not manage the Session's lifecycle. The user is responsible for managing the session. Args: session: SQLAlchemy ORM Session instance **options: Optional configuration parameters (same as from_engine) Returns: :class:`Database` instance configured to use the Session's bind/engine Example: >>> from sqlalchemy import create_engine >>> from sqlalchemy.orm import sessionmaker >>> from moltres import :class:`Database` >>> engine = create_engine("sqlite:///:memory:") >>> Session = sessionmaker(bind=engine) >>> with Session() as session: ... db = :class:`Database`.from_session(session) ... # Use Moltres with your existing session ... from moltres.table.schema import column ... _ = db.create_table("users", [column("id", "INTEGER")]).collect() """ # Extract engine/bind from session if hasattr(session, "bind") and session.bind is not None: bind = session.bind # Ensure we have an Engine, not a Connection if isinstance(bind, Engine): engine = bind else: # If bind is a Connection, get its engine if hasattr(bind, "engine"): engine = bind.engine if not isinstance(engine, Engine): raise ValueError( "Session bind's engine is not a valid Engine instance. " "Ensure the session is bound to an engine." ) else: raise ValueError( "Session bind is not an Engine or Connection. " "Ensure the session is bound to an engine." ) elif hasattr(session, "connection"): # For async sessions, might have connection instead conn = session.connection() engine = conn.engine # Ensure we have an Engine, not a Connection if not isinstance(engine, Engine): raise ValueError( "Session connection's engine is not a valid Engine instance. " "Ensure the session is bound to an engine." ) else: raise ValueError( "Session does not have a bind or connection. " "Ensure the session is bound to an engine." ) return cls.from_engine(engine, **options)
[docs] def close(self) -> None: """Close all database connections and dispose of the engine. This should be called when done with the database connection, especially for ephemeral test databases. Note: After calling close(), the :class:`Database` instance should not be used. """ self._close_resources()
def __enter__(self) -> "Database": """Enter the database context manager. Returns: Database: This database instance Example: >>> with connect("sqlite:///example.db") as db: ... df = db.table("users").select() ... results = df.collect() ... # db.close() called automatically on exit """ return self def __exit__( self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: """Exit the database context manager. Automatically closes the database connection when exiting the context, even if an exception occurred. Args: exc_type: Exception type if an exception occurred, None otherwise exc_val: Exception value if an exception occurred, None otherwise exc_tb: Exception traceback if an exception occurred, None otherwise """ self.close() def _close_resources(self) -> None: if self._closed: return self._cleanup_ephemeral_tables() engine = getattr(self._connections, "_engine", None) if engine is not None: try: engine.dispose(close=True) except Exception as exc: # pragma: no cover - defensive logger.debug("Error disposing engine during close: %s", exc) finally: self._connections._engine = None self._closed = True _ACTIVE_DATABASES.discard(self) def _register_ephemeral_table(self, name: str) -> None: self._ephemeral_tables.add(name) def _unregister_ephemeral_table(self, name: str) -> None: self._ephemeral_tables.discard(name) def _cleanup_ephemeral_tables(self) -> None: """Clean up all ephemeral tables. Delegates to :class:`EphemeralTableManager`. """ if not self._ephemeral_tables: return for table_name in list(self._ephemeral_tables): try: self.drop_table(table_name, if_exists=True).collect() except Exception as exc: # pragma: no cover - best effort logger.debug("Failed to drop ephemeral table %s: %s", table_name, exc) self._ephemeral_tables.clear() @overload def table(self, name: str) -> TableHandle: """Get a handle to a table in the database from table name. Args: name: Name of the table Returns: :class:`TableHandle`: Handle to the specified table """ ... @overload def table(self, model_class: Type["DeclarativeBase"]) -> TableHandle: """Get a handle to a table in the database from SQLAlchemy model class. Args: model_class: SQLAlchemy or SQLModel model class Returns: :class:`TableHandle`: Handle to the table corresponding to the model """ ...
[docs] def table( # type: ignore[misc] self, name_or_model: "TableNameOrModel" ) -> TableHandle: """Get a handle to a table in the database. Delegates to :class:`TableManager`. Args: name_or_model: Name of the table, SQLAlchemy model class, or SQLModel model class Returns: TableHandle for the specified table Raises: ValidationError: If table name is invalid ValueError: If model_class is not a valid SQLAlchemy or SQLModel model Example: >>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users") >>> # Get table handle by name >>> users = db.table("users") >>> df = users.select("id", "name") >>> results = df.collect() >>> results[0]["name"] 'Alice' >>> db.close() """ from .table_manager import TableManager table_manager = TableManager(self) return table_manager.table(name_or_model)
[docs] def insert( self, table_name: str, rows: Union[ Sequence[Mapping[str, object]], "Records", "pd.DataFrame", "pl.DataFrame", "pl.LazyFrame", ], ) -> int: """Insert rows into a table. Delegates to :class:`TableManager`. Args: table_name: Name of the table to insert into rows: Sequence of row dictionaries, Records, pandas DataFrame, polars DataFrame, or polars LazyFrame Returns: Number of rows inserted Raises: ValidationError: If table name is invalid or rows are empty Example: >>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> # Insert rows >>> count = db.insert("users", [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]) >>> count 2 >>> # Verify insertion >>> df = db.table("users").select() >>> results = df.collect() >>> len(results) 2 >>> results[0]["name"] 'Alice' >>> db.close() """ from .table_manager import TableManager table_manager = TableManager(self) return table_manager.insert(table_name, rows)
[docs] def update( self, table_name: str, *, where: "Column", set: Mapping[str, object], # noqa: A002 ) -> int: """Update rows in a table. Convenience method for updating data in a table. Args: table_name: Name of the table to update where: :class:`Column` expression for the WHERE clause set: Dictionary of column names to new values Returns: Number of rows updated Raises: ValidationError: If table name is invalid or set dictionary is empty Example: >>> from moltres import connect, col >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}], _database=db).insert_into("users") >>> # Update rows >>> count = db.update("users", where=col("id") == 1, set={"name": "Alice Updated"}) >>> count 1 >>> # Verify update >>> df = db.table("users").select() >>> results = df.collect() >>> results[0]["name"] 'Alice Updated' >>> db.close() """ from .mutations import update_rows handle = self.table(table_name) return update_rows(handle, where=where, values=set)
[docs] def delete(self, table_name: str, *, where: "Column") -> int: """Delete rows from a table. Convenience method for deleting data from a table. Args: table_name: Name of the table to delete from where: :class:`Column` expression for the WHERE clause Returns: Number of rows deleted Raises: ValidationError: If table name is invalid Example: >>> from moltres import connect, col >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], _database=db).insert_into("users") >>> # Delete rows >>> count = db.delete("users", where=col("id") == 1) >>> count 1 >>> # Verify deletion >>> df = db.table("users").select() >>> results = df.collect() >>> len(results) 1 >>> results[0]["name"] 'Bob' >>> db.close() """ from .mutations import delete_rows handle = self.table(table_name) return delete_rows(handle, where=where)
[docs] def merge( self, table_name: str, rows: Union[ Sequence[Mapping[str, object]], "Records", "pd.DataFrame", "pl.DataFrame", "pl.LazyFrame", ], *, on: Sequence[str], when_matched: Optional[Mapping[str, object]] = None, when_not_matched: Optional[Mapping[str, object]] = None, ) -> int: """Merge (upsert) rows into a table. Convenience method for merging data into a table with conflict resolution. Args: table_name: Name of the table to merge into rows: Sequence of row dictionaries, :class:`Records`, pandas :class:`DataFrame`, polars :class:`DataFrame`, or polars LazyFrame on: Sequence of column names that form the conflict key when_matched: Optional dictionary of column updates when a conflict occurs when_not_matched: Optional dictionary of default values when inserting new rows Returns: Number of rows affected (inserted or updated) Raises: ValidationError: If table name is invalid, rows are empty, or on columns are invalid Example: >>> db.merge( ... "users", ... [{"id": 1, "name": "Alice", "email": "alice@example.com"}], ... on=["id"], ... when_matched={"name": "Alice Updated"} ... ) """ from .mutations import merge_rows handle = self.table(table_name) return merge_rows( handle, rows, on=on, when_matched=when_matched, when_not_matched=when_not_matched )
@property def load(self) -> "DataLoader": """Return a DataLoader for loading data from files and tables as DataFrames. Note: For SQL operations on tables, use db.table(name).select() instead. """ from ..dataframe.io.reader import DataLoader return DataLoader(self) @property def read(self) -> "ReadAccessor": """Return a ReadAccessor for accessing read operations. Use db.read.records.* for :class:`Records`-based reads (backward compatibility). Use db.load.* for :class:`DataFrame`-based reads (PySpark-style). """ from ..dataframe.io.reader import ReadAccessor return ReadAccessor(self)
[docs] def sql(self, sql: str, **params: object) -> "DataFrame": """Execute a SQL query and return a :class:`DataFrame`. Similar to PySpark's `spark.sql()`, this method accepts a raw SQL string and returns a lazy :class:`DataFrame` that can be chained with further operations. The SQL dialect is determined by the database connection. Args: sql: SQL query string to execute **params: Optional named parameters for parameterized queries. Use `:param_name` syntax in SQL and pass values as kwargs. Returns: Lazy :class:`DataFrame` that can be chained with further operations Example: >>> from moltres import connect, col >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT"), column("age", "INTEGER")]).collect() >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "name": "Alice", "age": 25}, {"id": 2, "name": "Bob", "age": 17}], _database=db).insert_into("users") >>> # Basic SQL query >>> df = db.sql("SELECT * FROM users WHERE age > 18") >>> results = df.collect() >>> len(results) 1 >>> results[0]["name"] 'Alice' >>> # Parameterized query >>> df2 = db.sql("SELECT * FROM users WHERE id = :id", id=1) >>> results2 = df2.collect() >>> results2[0]["name"] 'Alice' >>> # Chaining operations >>> db.create_table("orders", [column("id", "INTEGER"), column("amount", "REAL")]).collect() >>> _ = :class:`Records`(_data=[{"id": 1, "amount": 150.0}, {"id": 2, "amount": 50.0}], _database=db).insert_into("orders") >>> df3 = db.sql("SELECT * FROM orders").where(col("amount") > 100).limit(1) >>> results3 = df3.collect() >>> len(results3) 1 >>> results3[0]["amount"] 150.0 >>> db.close() """ from ..dataframe.core.dataframe import DataFrame from ..logical import operators # Convert params dict to the format expected by RawSQL params_dict = params if params else None plan = operators.raw_sql(sql, params_dict) return DataFrame(plan=plan, database=self)
[docs] def scan_csv( self, path: str, schema: Optional[Sequence[ColumnDef]] = None, **options: object, ) -> "PolarsDataFrame": """Scan a CSV file as a :class:`PolarsDataFrame` (Polars-style). Args: path: Path to the CSV file schema: Optional explicit schema **options: Format-specific options (e.g., header=True, delimiter=",") Returns: :class:`PolarsDataFrame` containing the CSV data (lazy) Example: >>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_csv("data.csv", header=True) >>> results = df.collect() """ from .table_operations_helpers import build_scan_loader_chain loader = build_scan_loader_chain(self.read, schema, **options) return cast("PolarsDataFrame", loader.csv(path).polars())
[docs] def scan_json( self, path: str, schema: Optional[Sequence[ColumnDef]] = None, **options: object, ) -> "PolarsDataFrame": """Scan a JSON file (array of objects) as a :class:`PolarsDataFrame` (Polars-style). Args: path: Path to the JSON file schema: Optional explicit schema **options: Format-specific options (e.g., multiline=True) Returns: :class:`PolarsDataFrame` containing the JSON data (lazy) Example: >>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_json("data.json") >>> results = df.collect() """ from .table_operations_helpers import build_scan_loader_chain loader = build_scan_loader_chain(self.read, schema, **options) return cast("PolarsDataFrame", loader.json(path).polars())
[docs] def scan_jsonl( self, path: str, schema: Optional[Sequence[ColumnDef]] = None, **options: object, ) -> "PolarsDataFrame": """Scan a JSONL file (one JSON object per line) as a :class:`PolarsDataFrame` (Polars-style). Args: path: Path to the JSONL file schema: Optional explicit schema **options: Format-specific options Returns: :class:`PolarsDataFrame` containing the JSONL data (lazy) Example: >>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_jsonl("data.jsonl") >>> results = df.collect() """ from .table_operations_helpers import build_scan_loader_chain loader = build_scan_loader_chain(self.read, schema, **options) return cast("PolarsDataFrame", loader.jsonl(path).polars())
[docs] def scan_parquet( self, path: str, schema: Optional[Sequence[ColumnDef]] = None, **options: object, ) -> "PolarsDataFrame": """Scan a Parquet file as a :class:`PolarsDataFrame` (Polars-style). Args: path: Path to the Parquet file schema: Optional explicit schema **options: Format-specific options Returns: :class:`PolarsDataFrame` containing the Parquet data (lazy) Raises: RuntimeError: If pandas or pyarrow are not installed Example: >>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_parquet("data.parquet") >>> results = df.collect() """ from .table_operations_helpers import build_scan_loader_chain loader = build_scan_loader_chain(self.read, schema, **options) return cast("PolarsDataFrame", loader.parquet(path).polars())
[docs] def scan_text( self, path: str, column_name: str = "value", schema: Optional[Sequence[ColumnDef]] = None, **options: object, ) -> "PolarsDataFrame": """Scan a text file as a single column :class:`PolarsDataFrame` (Polars-style). Args: path: Path to the text file column_name: Name of the column to create (default: "value") schema: Optional explicit schema **options: Format-specific options Returns: :class:`PolarsDataFrame` containing the text file lines (lazy) Example: >>> from moltres import connect >>> db = connect("sqlite:///:memory:") >>> df = db.scan_text("data.txt", column_name="line") >>> results = df.collect() """ from .table_operations_helpers import build_scan_loader_chain loader = build_scan_loader_chain(self.read, schema, **options) return cast("PolarsDataFrame", loader.text(path, column_name=column_name).polars())
# -------------------------------------------------------------- DDL operations @overload def create_table( self, name: str, columns: Sequence[ColumnDef], *, if_not_exists: bool = True, temporary: bool = False, constraints: Optional[ Sequence[Union["UniqueConstraint", "CheckConstraint", "ForeignKeyConstraint"]] ] = None, ) -> "CreateTableOperation": """Create a lazy create table operation from table name and columns.""" ... @overload def create_table( self, model_class: Type["DeclarativeBase"], *, if_not_exists: bool = True, temporary: bool = False, ) -> "CreateTableOperation": """Create a lazy create table operation from SQLAlchemy model class.""" ...
[docs] def create_table( # type: ignore[misc] self, name_or_model: Union[str, Type["DeclarativeBase"]], columns: Optional[Sequence[ColumnDef]] = None, *, if_not_exists: bool = True, temporary: bool = False, constraints: Optional[ Sequence[Union["UniqueConstraint", "CheckConstraint", "ForeignKeyConstraint"]] ] = None, ) -> "CreateTableOperation": """Create a lazy create table operation. Delegates to :class:`DDLManager`. Args: name_or_model: Name of the table to create, or SQLAlchemy model class columns: Sequence of ColumnDef objects defining the table schema (required if name_or_model is str) if_not_exists: If True, don't error if table already exists (default: True) temporary: If True, create a temporary table (default: False) constraints: Optional sequence of constraint objects (UniqueConstraint, CheckConstraint, ForeignKeyConstraint). Ignored if model_class is provided (constraints are extracted from model). Returns: CreateTableOperation that executes on collect() Raises: ValidationError: If table name or columns are invalid ValueError: If model_class is not a valid SQLAlchemy model Example: >>> from moltres import connect >>> from moltres.table.schema import column, unique, check >>> db = connect("sqlite:///:memory:") >>> # Create table with constraints >>> op = db.create_table( ... "users", ... [column("id", "INTEGER", primary_key=True), column("email", "TEXT")], ... constraints=[unique("email"), check("id > 0", name="ck_positive_id")] ... ) >>> table = op.collect() # Executes the CREATE TABLE >>> # Verify table was created >>> tables = db.get_table_names() >>> "users" in tables True >>> db.close() """ from .ddl_manager import DDLManager ddl_manager = DDLManager(self) return ddl_manager.create_table( name_or_model, columns, if_not_exists=if_not_exists, temporary=temporary, constraints=constraints, )
[docs] def drop_table(self, name: str, *, if_exists: bool = True) -> "DropTableOperation": """Create a lazy drop table operation. Delegates to :class:`DDLManager`. Args: name: Name of the table to drop if_exists: If True, don't error if table doesn't exist (default: True) Returns: DropTableOperation that executes on collect() Example: >>> op = db.drop_table("users") >>> op.collect() # Executes the DROP TABLE """ from .ddl_manager import DDLManager ddl_manager = DDLManager(self) return ddl_manager.drop_table(name, if_exists=if_exists)
[docs] def create_index( self, name: str, table: str, columns: Union[str, Sequence[str]], *, unique: bool = False, if_not_exists: bool = True, ) -> "CreateIndexOperation": """Create a lazy create index operation. Delegates to :class:`DDLManager`. Args: name: Name of the index to create table: Name of the table to create the index on columns: Column name(s) to index (single string or sequence) unique: If True, create a UNIQUE index (default: False) if_not_exists: If True, don't error if index already exists (default: True) Returns: CreateIndexOperation that executes on collect() Example: >>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT"), column("name", "TEXT"), column("age", "INTEGER")]).collect() >>> # Create single-column index >>> op = db.create_index("idx_email", "users", "email") >>> op.collect() # Executes the CREATE INDEX >>> # Multi-column index >>> op2 = db.create_index("idx_name_age", "users", ["name", "age"], unique=True) >>> op2.collect() >>> db.close() """ from .ddl_manager import DDLManager ddl_manager = DDLManager(self) return ddl_manager.create_index( name, table, columns, unique=unique, if_not_exists=if_not_exists )
[docs] def drop_index( self, name: str, table: Optional[str] = None, *, if_exists: bool = True, ) -> "DropIndexOperation": """Create a lazy drop index operation. Args: name: Name of the index to drop table: Optional table name (required for some dialects like MySQL) if_exists: If True, don't error if index doesn't exist (default: True) Returns: DropIndexOperation that executes on collect() Example: >>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("email", "TEXT")]).collect() >>> db.create_index("idx_email", "users", "email").collect() >>> # Drop index >>> op = db.drop_index("idx_email", "users") >>> op.collect() # Executes the DROP INDEX >>> db.close() """ from .ddl_manager import DDLManager ddl_manager = DDLManager(self) return ddl_manager.drop_index(name, table=table, if_exists=if_exists)
# -------------------------------------------------------------- schema inspection
[docs] def get_table_names(self, schema: Optional[str] = None) -> List[str]: """Get list of table names in the database. Args: schema: Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema. Returns: List of table names Raises: ValueError: If database connection is not available RuntimeError: If inspection fails Example: >>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER")]).collect() >>> db.create_table("orders", [column("id", "INTEGER")]).collect() >>> # Get all table names >>> tables = db.get_table_names() >>> "users" in tables True >>> "orders" in tables True >>> db.close() """ from ..utils.inspector import get_table_names return get_table_names(self, schema=schema)
[docs] def get_view_names(self, schema: Optional[str] = None) -> List[str]: """Get list of view names in the database. Args: schema: Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema. Returns: List of view names Raises: ValueError: If database connection is not available RuntimeError: If inspection fails Example: >>> views = db.get_view_names() >>> # Returns: ['active_users_view', 'order_summary_view'] """ from ..utils.inspector import get_view_names return get_view_names(self, schema=schema)
[docs] def schema(self, table_name: str) -> List[ColumnDef]: """Get the schema (column definitions) for a table. Args: table_name: Name of the table Returns: List of :class:`ColumnDef` objects describing the table's columns Raises: ValueError: If table does not exist Example: >>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER"), column("name", "TEXT")]).collect() >>> schema = db.schema("users") >>> len(schema) 2 >>> schema[0].name 'id' >>> schema[0].type_name 'INTEGER' """ columns = self.get_columns(table_name) return [ ColumnDef( name=col.name, type_name=col.type_name, nullable=col.nullable, default=col.default, primary_key=col.primary_key, precision=col.precision, scale=col.scale, ) for col in columns ]
[docs] def tables(self, schema: Optional[str] = None) -> Dict[str, List[ColumnDef]]: """Get all tables in the database with their schemas. Args: schema: Optional schema name (for databases that support schemas) Returns: Dictionary mapping table names to their column definitions Example: >>> from moltres import connect >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> db.create_table("users", [column("id", "INTEGER")]).collect() >>> db.create_table("orders", [column("id", "INTEGER")]).collect() >>> tables = db.tables() >>> "users" in tables True >>> "orders" in tables True >>> len(tables["users"]) 1 """ table_names = self.get_table_names(schema=schema) result = {} for table_name in table_names: try: result[table_name] = self.schema(table_name) except Exception as exc: logger.debug("Failed to get schema for table %s: %s", table_name, exc) # Continue with other tables even if one fails result[table_name] = [] return result
[docs] def get_columns(self, table_name: str) -> List["ColumnInfo"]: """Get column information for a table. Args: table_name: Name of the table to inspect Returns: List of ColumnInfo objects with column metadata Raises: ValidationError: If table name is invalid ValueError: If database connection is not available RuntimeError: If table does not exist or cannot be inspected Example: >>> columns = db.get_columns("users") >>> # Returns: [ColumnInfo(name='id', type_name='INTEGER', ...), ...] """ from ..utils.exceptions import ValidationError from ..utils.inspector import get_table_columns from ..sql.builders import quote_identifier if not table_name: raise ValidationError("Table name cannot be empty") # Validate table name format quote_identifier(table_name, self._dialect.quote_char) return get_table_columns(self, table_name)
[docs] def reflect_table(self, name: str, schema: Optional[str] = None) -> "TableSchema": """Reflect a single table from the database. Args: name: Name of the table to reflect schema: Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema. Returns: TableSchema object with table metadata Raises: ValidationError: If table name is invalid ValueError: If database connection is not available RuntimeError: If table does not exist or reflection fails Example: >>> schema = db.reflect_table("users") >>> # Returns: TableSchema(name='users', columns=[ColumnDef(...), ...]) """ from ..utils.exceptions import ValidationError from ..utils.inspector import reflect_table from ..sql.builders import quote_identifier from .schema import TableSchema if not name: raise ValidationError("Table name cannot be empty") # Validate table name format quote_identifier(name, self._dialect.quote_char) reflected = reflect_table(self, name, schema=schema) column_defs = reflected[name] return TableSchema(name=name, columns=column_defs)
[docs] def reflect( self, schema: Optional[str] = None, views: bool = False ) -> Dict[str, "TableSchema"]: """Reflect entire database schema. Args: schema: Optional schema name (for multi-schema databases like PostgreSQL). If None, uses default schema. views: If True, also reflect views (default: False) Returns: Dictionary mapping table/view names to TableSchema objects Raises: ValueError: If database connection is not available RuntimeError: If reflection fails Example: >>> schemas = db.reflect() >>> # Returns: {'users': TableSchema(...), 'orders': TableSchema(...)} """ from ..utils.inspector import reflect_database from .schema import TableSchema reflected = reflect_database(self, schema=schema, views=views) # Convert to TableSchema objects result: Dict[str, TableSchema] = {} for table_name, column_defs in reflected.items(): result[table_name] = TableSchema(name=table_name, columns=column_defs) return result
# -------------------------------------------------------------- query utils
[docs] def compile_plan(self, plan: LogicalPlan) -> "Select": """Compile a logical plan to a SQLAlchemy Select statement.""" return compile_plan(plan, dialect=self._dialect)
[docs] def execute_plan(self, plan: LogicalPlan, model: Optional[Type[Any]] = None) -> QueryResult: """Execute a logical plan and return results. Delegates to :class:`DatabaseQueryExecutor`. """ from .query_executor import DatabaseQueryExecutor executor = DatabaseQueryExecutor(self) return executor.execute_plan(plan, model=model)
[docs] def execute_plan_stream(self, plan: LogicalPlan) -> Iterator[List[Dict[str, object]]]: """Execute a plan and return an iterator of row chunks. Delegates to :class:`DatabaseQueryExecutor`. """ from .query_executor import DatabaseQueryExecutor executor = DatabaseQueryExecutor(self) return executor.execute_plan_stream(plan)
[docs] def execute_sql(self, sql: str, params: Optional[Dict[str, Any]] = None) -> QueryResult: """Execute a raw SQL query. Delegates to :class:`DatabaseQueryExecutor`. """ from .query_executor import DatabaseQueryExecutor executor = DatabaseQueryExecutor(self) return executor.execute_sql(sql, params=params)
[docs] def explain(self, sql: str, params: Optional[Dict[str, Any]] = None) -> str: """Get the execution plan for a SQL query. Delegates to :class:`DatabaseQueryExecutor`. Args: sql: SQL query string params: Optional query parameters Returns: Execution plan as a string (dialect-specific) Example: >>> plan = db.explain("SELECT * FROM users WHERE id = :id", params={"id": 1}) >>> print(plan) """ from .query_executor import DatabaseQueryExecutor executor = DatabaseQueryExecutor(self) return executor.explain(sql, params=params)
[docs] def show_tables(self, schema: Optional[str] = None) -> None: """Print a formatted list of tables in the database. Convenience method for interactive exploration. Args: schema: Optional schema name Example: >>> db.show_tables() Tables in database: - users - orders - products """ tables = self.get_table_names(schema=schema) if tables: print("Tables in database:") for table in sorted(tables): print(f" - {table}") else: print("No tables found in database.")
[docs] def show_schema(self, table_name: str) -> None: """Print a formatted schema for a table. Convenience method for interactive exploration. Args: table_name: Name of the table Example: >>> db.show_schema("users") Schema for table 'users': - id: INTEGER (primary_key=True) - name: TEXT - email: TEXT """ from ..utils.exceptions import ValidationError if not table_name: raise ValidationError("Table name cannot be empty") columns = self.get_columns(table_name) if columns: print(f"Schema for table '{table_name}':") for col_info in columns: attrs = [] if col_info.primary_key: attrs.append("primary_key=True") if col_info.nullable is False: attrs.append("nullable=False") if col_info.default is not None: attrs.append(f"default={col_info.default}") attr_str = f" ({', '.join(attrs)})" if attrs else "" print(f" - {col_info.name}: {col_info.type_name}{attr_str}") else: print(f"No columns found for table '{table_name}'.")
@property def dialect(self) -> DialectSpec: return self._dialect
[docs] def is_in_transaction(self) -> bool: """Check if currently in a transaction. Returns: True if a transaction is active, False otherwise Example: >>> if db.is_in_transaction(): ... status = db.get_transaction_status() ... print(f"Isolation: {status['isolation_level']}") """ return self._connections.active_transaction is not None
[docs] def get_transaction_status(self) -> Optional[dict[str, object]]: """Get transaction status and metadata if a transaction is active. Returns: Dictionary with transaction metadata including: - readonly: Whether the transaction is read-only - isolation_level: Transaction isolation level (if set) - timeout: Transaction timeout in seconds (if set) - savepoints: List of active savepoint names None if no transaction is active Example: >>> with db.transaction(isolation_level="SERIALIZABLE", readonly=True) as txn: ... status = db.get_transaction_status() ... assert status["isolation_level"] == "SERIALIZABLE" ... assert status["readonly"] is True """ if not self.is_in_transaction(): return None metadata = self._connections.transaction_metadata or {} return { "readonly": metadata.get("readonly", False), "isolation_level": metadata.get("isolation_level"), "timeout": metadata.get("timeout"), "savepoints": self._connections.savepoint_stack, }
[docs] def batch(self) -> "OperationBatch": """Create a batch context for grouping multiple operations. All operations within the batch context are executed together in a single transaction when the context exits. If any exception occurs, all operations are rolled back. Returns: OperationBatch context manager Example: >>> with db.batch(): ... db.create_table("users", [...]) ... # All operations execute together on exit """ from .batch import OperationBatch return OperationBatch(self)
[docs] @contextmanager def transaction( self, savepoint: bool = False, readonly: bool = False, isolation_level: Optional[str] = None, timeout: Optional[float] = None, ) -> Iterator[Transaction]: """Create a transaction context for grouping multiple operations. All operations within the transaction context share the same transaction. If any exception occurs, the transaction is automatically rolled back. Otherwise, it is committed on successful exit. Args: savepoint: If True and a transaction is already active, create a savepoint instead of raising an error. This enables nested transactions. readonly: If True, set the transaction to read-only mode. Prevents writes. isolation_level: Optional isolation level. Must be one of: - "READ UNCOMMITTED" - "READ COMMITTED" - "REPEATABLE READ" - "SERIALIZABLE" timeout: Optional transaction timeout in seconds. Database-specific behavior: - PostgreSQL: Sets statement_timeout (milliseconds) - MySQL: Sets innodb_lock_wait_timeout (seconds) - SQLite: Not supported Yields: :class:`Transaction` object that can be used for explicit commit/rollback Example: Basic transaction:: >>> with db.transaction() as txn: ... df.write.insertInto("table") ... df.write.update("table", where=..., set={...}) ... # If any operation fails, all are rolled back ... # Otherwise, all are committed on exit Nested transaction with savepoint:: >>> with db.transaction() as outer: ... # ... operations ... ... with db.transaction(savepoint=True) as inner: ... # ... operations that can be rolled back independently ... ... inner.savepoint("checkpoint") ... # ... operations ... ... inner.rollback_to_savepoint("checkpoint") ... # outer transaction continues... Read-only transaction:: >>> with db.transaction(readonly=True) as txn: ... results = db.table("users").select().collect() Transaction with isolation level:: >>> with db.transaction(isolation_level="SERIALIZABLE") as txn: ... # ... critical operations requiring highest isolation ... """ # Check if there's already an active transaction (for savepoint detection) had_active_transaction = self._connections.active_transaction is not None connection = self._connections.begin_transaction( savepoint=savepoint, readonly=readonly, isolation_level=isolation_level, timeout=timeout, ) metadata = self._connections.transaction_metadata or {} # If savepoint=True was requested and there was already an active transaction, # this is a savepoint transaction is_savepoint_txn = savepoint and had_active_transaction txn = Transaction( self, connection, readonly=bool(metadata.get("readonly", False)) if metadata else readonly, isolation_level=cast(Optional[str], metadata.get("isolation_level")) if metadata else isolation_level, is_savepoint=is_savepoint_txn, ) # Track savepoint name if this is a savepoint savepoint_name: Optional[str] = None if is_savepoint_txn: savepoint_stack = self._connections.savepoint_stack if savepoint_stack: savepoint_name = savepoint_stack[-1] # Call Transaction's __enter__ to set up hooks and metrics from ..utils.transaction_hooks import _execute_hooks, _on_begin_hooks # Execute begin hooks _execute_hooks(_on_begin_hooks, txn) # Start metrics tracking import time metrics_start_time = time.time() exc_info = None committed = False try: yield txn if not txn._committed and not txn._rolled_back: if is_savepoint_txn and savepoint_name: # For savepoints, we don't commit - the outer transaction handles it # But we should release the savepoint try: txn.release_savepoint(savepoint_name) except RuntimeError: # Savepoint may have already been released pass else: txn.commit() committed = True else: committed = txn._committed except Exception as exc: exc_info = exc if not txn._rolled_back: if is_savepoint_txn and savepoint_name: # For savepoints, rollback to the savepoint try: txn.rollback_to_savepoint(savepoint_name) except RuntimeError: # Fallback to regular rollback if savepoint rollback fails txn.rollback() else: txn.rollback() raise finally: # Record metrics (always called, even on exception) from ..utils.transaction_metrics import get_transaction_metrics duration = time.time() - metrics_start_time final_committed = committed if not exc_info else False metrics = get_transaction_metrics() metrics.record_transaction( duration=duration, committed=final_committed, has_savepoint=is_savepoint_txn, readonly=readonly, isolation_level=isolation_level, error=exc_info if exc_info else None, )
[docs] def createDataFrame( self, data: Union[ Sequence[dict[str, Any]], Sequence[tuple], Records, "LazyRecords", "pd.DataFrame", "pl.DataFrame", "pl.LazyFrame", ], schema: Optional[Sequence[ColumnDef]] = None, pk: Optional[Union[str, Sequence[str]]] = None, auto_pk: Optional[Union[str, Sequence[str]]] = None, ) -> "DataFrame": """Create a DataFrame from Python data. Delegates to :class:`EphemeralTableManager`. Creates a temporary table, inserts the data, and returns a DataFrame querying from that table. If LazyRecords is provided, it will be auto-materialized. If pandas/polars DataFrame or LazyFrame is provided, it will be converted to Records with lazy conversion. Args: data: Input data in one of supported formats: - List of dicts: [{"col1": val1, "col2": val2}, ...] - List of tuples: Requires schema parameter with column names - Records object: Extracts data and schema if available - LazyRecords object: Auto-materializes and extracts data and schema - pandas DataFrame: Converts to Records with schema preservation - polars DataFrame: Converts to Records with schema preservation - polars LazyFrame: Materializes and converts to Records with schema preservation schema: Optional explicit schema. If not provided, schema is inferred from data. pk: Optional column name(s) to mark as primary key. Can be a single string or sequence of strings for composite keys. auto_pk: Optional column name(s) to create as auto-incrementing primary key. Can specify same name as pk to make an existing column auto-incrementing. Returns: DataFrame querying from the created temporary table Raises: ValueError: If data is empty and no schema provided, or if primary key requirements are not met ValidationError: If list of tuples provided without schema, or other validation errors Example: >>> # Create DataFrame from list of dicts >>> df = db.createDataFrame([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}], pk="id") >>> # Create DataFrame with auto-incrementing primary key >>> df = db.createDataFrame([{"name": "Alice"}, {"name": "Bob"}], auto_pk="id") >>> # Create DataFrame from Records >>> from moltres.io.records import Records >>> records = Records(_data=[{"id": 1, "name": "Alice"}], _database=db) >>> df = db.createDataFrame(records, pk="id") >>> # Create DataFrame from LazyRecords (auto-materializes) >>> lazy_records = db.read.records.csv("data.csv") >>> df = db.createDataFrame(lazy_records, pk="id") >>> # Create DataFrame from pandas DataFrame >>> import pandas as pd >>> pdf = pd.DataFrame([{"id": 1, "name": "Alice"}]) >>> df = db.createDataFrame(pdf, pk="id") >>> # Create DataFrame from polars DataFrame >>> import polars as pl >>> plf = pl.DataFrame([{"id": 1, "name": "Alice"}]) >>> df = db.createDataFrame(plf, pk="id") """ from .ephemeral_manager import EphemeralTableManager manager = EphemeralTableManager(self) return manager.create_dataframe(data, schema=schema, pk=pk, auto_pk=auto_pk)
[docs] def create_dataframe( self, data: Union[ Sequence[dict[str, object]], Sequence[tuple], Records, "LazyRecords", "pd.DataFrame", "pl.DataFrame", "pl.LazyFrame", ], schema: Optional[Sequence[ColumnDef]] = None, pk: Optional[Union[str, Sequence[str]]] = None, auto_pk: Optional[Union[str, Sequence[str]]] = None, ) -> "DataFrame": """Create a DataFrame from Python data (snake_case alias for createDataFrame). This is an alias for :meth:`createDataFrame`. See :meth:`createDataFrame` for full documentation. Args: data: Input data in one of supported formats schema: Optional explicit schema pk: Optional column name(s) to mark as primary key auto_pk: Optional column name(s) to create as auto-incrementing primary key Returns: DataFrame querying from the created temporary table """ return self.createDataFrame(data, schema=schema, pk=pk, auto_pk=auto_pk)
# ----------------------------------------------------------------- internals @property def _dialect_name(self) -> str: if self.config.engine.dialect: return self.config.engine.dialect # If session is provided, extract dialect from session's bind (engine) if self.config.engine.session is not None: session = self.config.engine.session if hasattr(session, "get_bind"): bind = session.get_bind() elif hasattr(session, "bind"): bind = session.bind else: bind = None if bind is not None: dialect_name = getattr(getattr(bind, "dialect", None), "name", None) if dialect_name: # Normalize driver variants (e.g., "mysql+aiomysql" -> "mysql") if "+" in dialect_name: dialect_name = dialect_name.split("+", 1)[0] return str(dialect_name) # Extract dialect from DSN, normalizing driver variants (e.g., "mysql+pymysql" -> "mysql") dsn = self.config.engine.dsn if not dsn: return "ansi" dialect_part = dsn.split(":", 1)[0] # Normalize driver variants: "mysql+pymysql" -> "mysql", "postgresql+psycopg2" -> "postgresql" if "+" in dialect_part: dialect_part = dialect_part.split("+", 1)[0] return dialect_part
def _cleanup_all_databases() -> None: """Best-effort cleanup for any :class:`Database` instances left open at exit. This is called on normal interpreter shutdown and on signal handlers for crash scenarios (SIGTERM, SIGINT). """ for db in list(_ACTIVE_DATABASES): try: db._close_resources() except Exception as exc: # pragma: no cover - atexit safeguard logger.debug("Database cleanup during interpreter shutdown failed: %s", exc) def _signal_handler(signum: int, frame: Optional[FrameType]) -> None: """Handle signals (SIGTERM, SIGINT) by cleaning up databases before exit.""" logger.info("Received signal %d, cleaning up databases...", signum) _cleanup_all_databases() # Re-raise the signal with default handler signal.signal(signum, signal.SIG_DFL) import os os.kill(os.getpid(), signum) # Register signal handlers for crash scenarios (only on main thread) try: # Check if we can register signal handlers (main thread only) signal.signal(signal.SIGTERM, _signal_handler) signal.signal(signal.SIGINT, _signal_handler) except (ValueError, OSError): # Signal handlers can only be registered on the main thread # This is expected in some contexts (e.g., subprocesses, threads) pass def _force_database_cleanup_for_tests() -> None: """Helper used by tests to simulate crash/GC cleanup.""" _cleanup_all_databases() atexit.register(_cleanup_all_databases)