Build a practical Parquet ETL flow with filtering, enrichment, and grouped outputs.
Tutorial: ETL Pipeline
This tutorial shows a standard analytics ETL shape:
- Read raw records
- Filter invalid rows
- Derive columns
- Aggregate by business key
- Write Parquet outputs
Scenario
You receive transaction records and need a daily country-level revenue table.
Step 1: Load Data
import framex as fx
raw = fx.read_parquet("transactions.parquet")
Step 2: Filter Invalid and Refund Rows
filtered = raw.filter((raw["amount"] > 0) & (~raw["is_refund"]))
Step 3: Add Computed Columns
enriched = filtered.assign(
gross_amount=lambda d: d["amount"] * 1.07,
)
Step 4: Aggregate KPI Table
kpi = (
enriched
.groupby(["event_date", "country"])
.agg({"gross_amount": ["sum", "mean", "count"]})
.sort(["event_date", "gross_amount_sum"], ascending=[True, False])
)
Step 5: Export
fx.write_parquet(kpi, "outputs/country_daily_kpi.parquet")
Optional: Lazy Plan for Long Pipelines
kpi_lazy = (
raw.lazy()
.filter(lambda d: (d["amount"] > 0) & (~d["is_refund"]))
.with_column("gross_amount", lambda d: d["amount"] * 1.07)
.groupby(["event_date", "country"])
.agg({"gross_amount": ["sum", "mean", "count"]})
.collect()
)
Validation Tip
At integration boundaries, compare against Pandas for confidence:
pandas_result = kpi.to_pandas()