Source code for moltres.expressions.functions.window

"""Window functions for :class:`DataFrame` operations."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, List, Optional

from ..column import Column, ColumnLike, ensure_column

if TYPE_CHECKING:
    from ..expr import ExpressionArg
else:
    ExpressionArg = Any


[docs] def row_number() -> Column: """Generate a row number for each row in a window. Returns: :class:`Column` expression for row_number() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("sales", [column("id", "INTEGER"), column("category", "TEXT"), column("amount", "REAL")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "category": "A", "amount": 100.0}, {"id": 2, "category": "A", "amount": 200.0}], _database=db).insert_into("sales") >>> df = db.table("sales").select().withColumn("row_num", F.row_number().over(partition_by=col("category"), order_by=col("amount"))) >>> results = df.collect() >>> results[0]["row_num"] 1 >>> results[1]["row_num"] 2 >>> db.close() """ return Column(op="window_row_number", args=())
[docs] def rank() -> Column: """Compute the rank of rows within a window. Returns: :class:`Column` expression for rank() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("scores", [column("id", "INTEGER"), column("category", "TEXT"), column("score", "REAL")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "category": "A", "score": 100.0}, {"id": 2, "category": "A", "score": 100.0}, {"id": 3, "category": "A", "score": 90.0}], _database=db).insert_into("scores") >>> df = db.table("scores").select().withColumn("rank", F.rank().over(partition_by=col("category"), order_by=col("score"))) >>> results = df.collect() >>> sorted_results = sorted(results, key=lambda x: x["id"]) >>> # With ascending order: score 90.0 gets rank 1, scores 100.0 get rank 2 >>> sorted_results[0]["rank"] # id=1, score=100.0 2 >>> sorted_results[1]["rank"] # id=2, score=100.0 2 >>> sorted_results[2]["rank"] # id=3, score=90.0 1 >>> db.close() """ return Column(op="window_rank", args=())
[docs] def dense_rank() -> Column: """Compute the dense rank of rows within a window. Returns: :class:`Column` expression for dense_rank() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("scores", [column("id", "INTEGER"), column("category", "TEXT"), column("score", "REAL")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "category": "A", "score": 100.0}, {"id": 2, "category": "A", "score": 100.0}, {"id": 3, "category": "A", "score": 90.0}], _database=db).insert_into("scores") >>> df = db.table("scores").select().withColumn("dense_rank", F.dense_rank().over(partition_by=col("category"), order_by=col("score"))) >>> results = df.collect() >>> sorted_results = sorted(results, key=lambda x: x["id"]) >>> sorted_results[0]["dense_rank"] # id=1, score=100.0 2 >>> sorted_results[1]["dense_rank"] # id=2, score=100.0 2 >>> sorted_results[2]["dense_rank"] # id=3, score=90.0 1 >>> db.close() """ return Column(op="window_dense_rank", args=())
[docs] def percent_rank() -> Column: """Compute the percent rank of rows within a window. Returns: :class:`Column` expression for percent_rank() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("scores", [column("id", "INTEGER"), column("category", "TEXT"), column("score", "REAL")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "category": "A", "score": 100.0}, {"id": 2, "category": "A", "score": 90.0}], _database=db).insert_into("scores") >>> df = db.table("scores").select().withColumn("percent_rank", F.percent_rank().over(partition_by=col("category"), order_by=col("score"))) >>> results = df.collect() >>> # percent_rank returns values between 0.0 and 1.0 >>> any(0.0 <= r["percent_rank"] <= 1.0 for r in results) True >>> db.close() """ return Column(op="window_percent_rank", args=())
[docs] def cume_dist() -> Column: """Compute the cumulative distribution of rows within a window. Returns: :class:`Column` expression for cume_dist() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("scores", [column("id", "INTEGER"), column("category", "TEXT"), column("score", "REAL")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "category": "A", "score": 100.0}, {"id": 2, "category": "A", "score": 90.0}], _database=db).insert_into("scores") >>> df = db.table("scores").select().withColumn("cume_dist", F.cume_dist().over(partition_by=col("category"), order_by=col("score"))) >>> results = df.collect() >>> # cume_dist returns values between 0.0 and 1.0 >>> any(0.0 <= r["cume_dist"] <= 1.0 for r in results) True >>> db.close() """ return Column(op="window_cume_dist", args=())
[docs] def nth_value(column: ColumnLike, n: int) -> Column: """Get the nth value in a window. Args: column: :class:`Column` expression to get the value from n: The position (1-based) of the value to retrieve Returns: :class:`Column` expression for nth_value() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("sales", [column("id", "INTEGER"), column("category", "TEXT"), column("amount", "REAL"), column("date", "DATE")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "category": "A", "amount": 100.0, "date": "2024-01-01"}, {"id": 2, "category": "A", "amount": 200.0, "date": "2024-01-02"}], _database=db).insert_into("sales") >>> df = db.table("sales").select().withColumn("second_amount", F.nth_value(col("amount"), 2).over(partition_by=col("category"), order_by=col("date"))) >>> results = df.collect() >>> # nth_value(2) returns the second value in the window >>> any("second_amount" in r for r in results) True >>> db.close() """ return Column(op="window_nth_value", args=(ensure_column(column), n))
[docs] def ntile(n: int) -> Column: """Divide rows into n roughly equal groups. Args: n: Number of groups to divide rows into Returns: :class:`Column` expression for ntile() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("scores", [column("id", "INTEGER"), column("score", "REAL")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "score": 100.0}, {"id": 2, "score": 90.0}, {"id": 3, "score": 80.0}, {"id": 4, "score": 70.0}], _database=db).insert_into("scores") >>> df = db.table("scores").select().withColumn("quartile", F.ntile(4).over(order_by=col("score"))) >>> results = df.collect() >>> # ntile(4) divides rows into 4 groups (1-4) >>> any(1 <= r["quartile"] <= 4 for r in results) True >>> db.close() """ return Column(op="window_ntile", args=(n,))
[docs] def lag(column: ColumnLike, offset: int = 1, default: Optional[ColumnLike] = None) -> Column: """Get the value of a column from a previous row in the window. Args: column: :class:`Column` to get the lagged value from offset: Number of rows to look back (default: 1) default: Default value if offset goes beyond window (optional) Returns: :class:`Column` expression for lag() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("data", [column("id", "INTEGER"), column("value", "REAL"), column("date", "DATE")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "value": 10.0, "date": "2024-01-01"}, {"id": 2, "value": 20.0, "date": "2024-01-02"}], _database=db).insert_into("data") >>> df = db.table("data").select().withColumn("prev_value", F.lag(col("value"), offset=1).over(order_by=col("date"))) >>> results = df.collect() >>> sorted_results = sorted(results, key=lambda x: x["id"]) >>> sorted_results[0]["prev_value"] is None # First row has no previous value True >>> sorted_results[1]["prev_value"] # Second row gets previous value 10.0 >>> db.close() """ args: List[ExpressionArg] = [ensure_column(column), offset] if default is not None: args.append(ensure_column(default)) return Column(op="window_lag", args=tuple(args))
[docs] def lead(column: ColumnLike, offset: int = 1, default: Optional[ColumnLike] = None) -> Column: """Get the value of a column from a following row in the window. Args: column: :class:`Column` to get the leading value from offset: Number of rows to look ahead (default: 1) default: Default value if offset goes beyond window (optional) Returns: :class:`Column` expression for lead() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("data", [column("id", "INTEGER"), column("value", "REAL"), column("date", "DATE")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "value": 10.0, "date": "2024-01-01"}, {"id": 2, "value": 20.0, "date": "2024-01-02"}], _database=db).insert_into("data") >>> df = db.table("data").select().withColumn("next_value", F.lead(col("value"), offset=1).over(order_by=col("date"))) >>> results = df.collect() >>> sorted_results = sorted(results, key=lambda x: x["id"]) >>> sorted_results[0]["next_value"] # First row gets next value 20.0 >>> sorted_results[1]["next_value"] is None # Last row has no next value True >>> db.close() """ args: List[ExpressionArg] = [ensure_column(column), offset] if default is not None: args.append(ensure_column(default)) return Column(op="window_lead", args=tuple(args))
[docs] def first_value(column: ColumnLike) -> Column: """Get the first value in a window (window function). Args: column: :class:`Column` expression to get the first value from Returns: :class:`Column` expression for first_value() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("sales", [column("id", "INTEGER"), column("category", "TEXT"), column("amount", "REAL"), column("date", "DATE")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "category": "A", "amount": 100.0, "date": "2024-01-01"}, {"id": 2, "category": "A", "amount": 200.0, "date": "2024-01-02"}], _database=db).insert_into("sales") >>> df = db.table("sales").select().withColumn("first_amount", F.first_value(col("amount")).over(partition_by=col("category"), order_by=col("date"))) >>> results = df.collect() >>> sorted_results = sorted(results, key=lambda x: x["id"]) >>> sorted_results[0]["first_amount"] # First value in window 100.0 >>> sorted_results[1]["first_amount"] # First value in window (same partition) 100.0 >>> db.close() """ return Column(op="window_first_value", args=(ensure_column(column),))
[docs] def last_value(column: ColumnLike) -> Column: """Get the last value in a window (window function). Args: column: :class:`Column` expression to get the last value from Returns: :class:`Column` expression for last_value() window function Example: >>> from moltres import connect, col >>> from moltres.expressions import functions as F >>> from moltres.table.schema import column >>> db = connect("sqlite:///:memory:") >>> _ = db.create_table("sales", [column("id", "INTEGER"), column("category", "TEXT"), column("amount", "REAL"), column("date", "DATE")]).collect() # doctest: +ELLIPSIS >>> from moltres.io.records import :class:`Records` >>> _ = :class:`Records`(_data=[{"id": 1, "category": "A", "amount": 100.0, "date": "2024-01-01"}, {"id": 2, "category": "A", "amount": 200.0, "date": "2024-01-02"}], _database=db).insert_into("sales") >>> df = db.table("sales").select().withColumn("last_amount", F.last_value(col("amount")).over(partition_by=col("category"), order_by=col("date"))) >>> results = df.collect() >>> sorted_results = sorted(results, key=lambda x: x["id"]) >>> sorted_results[0]["last_amount"] # Last value in window (up to current row) 100.0 >>> sorted_results[1]["last_amount"] # Last value in window (up to current row) 200.0 >>> db.close() """ return Column(op="window_last_value", args=(ensure_column(column),))