PySpark to Moltres Migration Guide
This guide helps you migrate from PySpark to Moltres, highlighting key differences and providing migration examples.
Overview
Moltres provides a PySpark-like API but compiles operations to SQL instead of executing on a Spark cluster. This means:
No cluster setup required - works with any SQL database
SQL pushdown - operations are compiled to SQL and executed in the database
Lazy evaluation - similar to PySpark’s lazy evaluation model
Familiar API - many methods have the same names and signatures
Key Differences
1. Initialization
PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("app").getOrCreate()
df = spark.read.table("users")
Moltres:
from moltres import connect
db = connect("postgresql://user:pass@host/dbname")
df = db.table("users").select()
2. Reading Data
PySpark:
df = spark.read.csv("data.csv")
df = spark.read.json("data.json")
df = spark.read.parquet("data.parquet")
Moltres:
df = db.load.csv("data.csv")
df = db.load.json("data.json")
df = db.load.parquet("data.parquet")
3. Basic Operations
Most DataFrame operations work similarly:
PySpark:
df.select("id", "name").where(col("age") > 18).order_by(col("name"))
Moltres:
from moltres import col
df.select("id", "name").where(col("age") > 18).order_by(col("name"))
4. Aggregations
PySpark:
from pyspark.sql.functions import sum, avg, count
df.group_by("category").agg(
sum("amount").alias("total"),
avg("amount").alias("average"),
count("*").alias("count")
)
Moltres:
from moltres.expressions.functions import sum, avg, count
df.group_by("category").agg(
sum(col("amount")).alias("total"),
avg(col("amount")).alias("average"),
count("*").alias("count")
)
5. Joins
PySpark:
df1.join(df2, on="id", how="left")
df1.join(df2, on=[col("df1.id") == col("df2.customer_id")], how="inner")
Moltres:
df1.join(df2, on="id", how="left")
df1.join(df2, on=[col("df1.id") == col("df2.customer_id")], how="inner")
6. Window Functions
PySpark:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank
window = Window.partition_by("category").order_by(col("amount").desc())
df.with_column("rank", rank().over(window))
Moltres:
from moltres.expressions.window import Window
from moltres.expressions.functions import rank
window = Window.partition_by(col("category")).order_by(col("amount").desc())
df.with_column("rank", rank().over(window))
7. UDFs (User-Defined Functions)
PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(returnType=StringType())
def my_udf(x):
return x.upper()
df.with_column("upper", my_udf(col("name")))
Moltres: Moltres doesn’t support UDFs directly since operations are pushed down to SQL. Instead, use SQL functions:
from moltres.expressions.functions import upper
df.with_column("upper", upper(col("name")))
Or use raw SQL expressions for complex logic.
8. Collecting Results
PySpark:
rows = df.collect() # Returns list of Row objects
df.show() # Prints first 20 rows
Moltres:
rows = df.collect() # Returns list of dicts
df.show() # Prints first 20 rows
9. Writing Data
PySpark:
df.write.mode("overwrite").parquet("output.parquet")
df.write.mode("append").csv("output.csv")
Moltres:
df.write.parquet("output.parquet", mode="overwrite")
df.write.csv("output.csv", mode="append")
Migration Checklist
Replace SparkSession with Database connection
Use
connect()instead ofSparkSession.builderConfigure connection string for your database
Update imports
Replace
pyspark.sql.functionswithmoltres.expressions.functionsReplace
pyspark.sql.windowwithmoltres.expressions.window
Column references
Use
col("name")consistently (PySpark sometimes allows strings directly)Import
colfrommoltres
UDFs
Replace UDFs with SQL functions where possible
For complex logic, consider using SQL expressions or database functions
Data types
Moltres uses SQL types (INTEGER, TEXT, REAL, etc.)
Check type compatibility with your database
Testing
Test queries with
df.to_sql()to see generated SQLVerify results match PySpark output
Common Patterns
Pattern 1: Filtering and Aggregation
PySpark:
df.filter(col("status") == "active") \
.group_by("category") \
.agg(sum("amount").alias("total"))
Moltres:
df.where(col("status") == "active") \
.group_by("category") \
.agg(sum(col("amount")).alias("total"))
Pattern 2: Window Functions
PySpark:
window = Window.partition_by("department").order_by(col("salary").desc())
df.with_column("rank", row_number().over(window))
Moltres:
window = Window.partition_by(col("department")).order_by(col("salary").desc())
df.with_column("rank", row_number().over(window))
Pattern 3: Complex Joins
PySpark:
df1.join(df2, df1.id == df2.customer_id, "left")
Moltres:
df1.join(df2, on=[col("df1.id") == col("df2.customer_id")], how="left")
Limitations
No distributed processing - Moltres executes in the database, not on a cluster
No UDFs - Use SQL functions instead
Limited complex types - Array/Map support varies by database
SQL dialect differences - Some operations may not work on all databases
Benefits
No infrastructure - No Spark cluster needed
Better performance - SQL pushdown to optimized database engines
Simpler deployment - Just connect to your existing database
Cost effective - No cluster maintenance costs
Getting Help
Check the API documentation
Review examples