"""Query execution helpers (moltres facade over :mod:`moltres_core.sql`).
Translates :mod:`moltres_core.exceptions` into :mod:`moltres.utils.exceptions` so
framework integrations (FastAPI, etc.) keep matching registered handlers.
"""
from __future__ import annotations
from typing import Any, Dict, Optional, Sequence, Type, Union
from moltres_core.exceptions import ExecutionError as _CoreExecutionError
from moltres_core.exceptions import QueryTimeoutError as _CoreQueryTimeoutError
from moltres_core.sql.execution import (
QueryResult,
register_performance_hook,
unregister_performance_hook,
)
from moltres_core.sql.execution import QueryExecutor as _CoreQueryExecutor
from moltres.utils.exceptions import ExecutionError, QueryTimeoutError
[docs]
class QueryExecutor(_CoreQueryExecutor):
"""Same as :class:`moltres_core.sql.execution.QueryExecutor` with public Moltres exceptions."""
[docs]
def fetch(
self,
stmt: Union[str, Any],
params: Optional[Dict[str, Any]] = None,
connection: Any = None,
model: Optional[Type[Any]] = None,
) -> QueryResult:
try:
return super().fetch(stmt, params=params, connection=connection, model=model)
except _CoreQueryTimeoutError as exc:
timeout = exc.context.get("timeout_seconds") if exc.context else None
raise QueryTimeoutError(
exc.message, timeout=timeout, context=dict(exc.context)
) from exc
except _CoreExecutionError as exc:
raise ExecutionError(
exc.message, suggestion=exc.suggestion, context=dict(exc.context)
) from exc
[docs]
def execute(
self,
sql: str,
params: Optional[Dict[str, Any]] = None,
transaction: Any = None,
) -> QueryResult:
try:
return super().execute(sql, params=params, transaction=transaction)
except _CoreQueryTimeoutError as exc:
timeout = exc.context.get("timeout_seconds") if exc.context else None
raise QueryTimeoutError(
exc.message, timeout=timeout, context=dict(exc.context)
) from exc
except _CoreExecutionError as exc:
raise ExecutionError(
exc.message, suggestion=exc.suggestion, context=dict(exc.context)
) from exc
[docs]
def execute_many(
self,
sql: str,
params_list: Sequence[Dict[str, Any]],
transaction: Any = None,
) -> QueryResult:
try:
return super().execute_many(sql, params_list, transaction=transaction)
except _CoreQueryTimeoutError as exc:
timeout = exc.context.get("timeout_seconds") if exc.context else None
raise QueryTimeoutError(
exc.message, timeout=timeout, context=dict(exc.context)
) from exc
except _CoreExecutionError as exc:
raise ExecutionError(
exc.message, suggestion=exc.suggestion, context=dict(exc.context)
) from exc
__all__ = [
"QueryExecutor",
"QueryResult",
"register_performance_hook",
"unregister_performance_hook",
]