Objective: Build a working end-to-end data engineering pipeline using pure Python. You’ll extract, clean, transform, model, and visualize data — mirroring how real data engineers work.
| Stage | Folder | Core Concept | Output |
|---|---|---|---|
| 1️⃣ Extract | etl/extract.py |
Getting data from source | data/00-raw/*.csv |
| 2️⃣ Load | etl/load.py |
Cleaning + validating | data/01-clean/*.parquet |
| 3️⃣ Transform | etl/transform.py |
Modeling (fact & dimension) | data/02-model/fact_sales.parquet |
| 4️⃣ Build | etl/build.py |
Aggregation + gold layer | data/02-model/sales_by_*.parquet |
| 5️⃣ Visualize | app.py |
Dashboard + DQ view | Streamlit web app |
If you haven’t already:
git clone https://github.com/YOURNAME/py-dataengineering-workshop.git
cd py-dataengineering-workshop
uv venv && uv syncCreate the folder structure automatically by running the base pipeline once:
uv run python -m etl.run --stage extract📄 File: etl/extract.py
Extraction is the first step of every data pipeline:
- Fetches raw files (CSV, JSON, etc.)
- Validates they exist
- Stores them in a raw layer (
data/00-raw/)
- File I/O using
urllib.request - Idempotency: avoids re-downloading if file already exists
- Standardized directory layout (Bronze / Raw Layer)
uv run python -m etl.run --stage extractExpected output:
[EXTRACT] Downloading .../customers.csv → data/00-raw/Customers.csv
[EXTRACT] Loaded: customers, orders, od, products
Check the raw data:
ls data/00-rawEach file corresponds to a Northwind table.
📄 File: etl/load.py
📄 Linked helper: etl/dq.py
Data loading is about preparing data for reliable downstream use:
- Renames columns to consistent schema
- Validates required fields
- Converts data types
- Logs Data Quality (DQ) checks
- Null or invalid
OrderID - Negative
QuantityorUnitPrice Discountout of range (0–1)- Missing foreign keys (
CustomerIDmismatch)
uv run python -m etl.run --stage loadOutput:
[LOAD] Written clean parquet + DQ. Issues: []
ls data/01-cleanLook for:
customers.parquetorders.parquetorder_details.parquet_dq/dq_runs.parquet_dq/dq_issues.parquet
- Why use Parquet instead of CSV? (columnar, compressed)
- Why have data quality logs?
- What does a “clean” dataset look like?
📄 File: etl/transform.py
Transformation is where we combine and enrich data:
- Joins cleaned tables
- Creates facts (sales) and dimensions (customers)
- Adds calculated columns
| Model Type | Example | Purpose |
|---|---|---|
| Fact Table | fact_sales |
transactional metrics |
| Dimension Table | dim_customer |
attributes like company name, country |
| OBT (One Big Table) | fact_sales joined with dim tables |
simpler analytics |
uv run python -m etl.run --stage transformThis runs:
- Load → Clean data
- Transform → Join + Compute
line_amount
Check new outputs in data/02-model/:
dim_customer.parquet
fact_sales.parquet
- What’s the role of a Customer dimension?
- Why compute
line_amount = UnitPrice * Quantity * (1 - Discount)? - How does this mirror “star schema” modeling?
📄 File: etl/build.py
The Build step aggregates your fact table into analytical outputs, often called gold tables or data marts.
This mimics real business metrics:
- Total sales per customer
- Total sales per country
- Total sales per product
uv run python -m etl.run --stage buildls data/02-modelYou’ll now have:
sales_by_customer.parquetsales_by_country.parquetsales_by_product.parquet
- Why aggregate instead of querying raw data directly?
- How does this improve performance and reuse?
📄 File: app.py
Visualization completes the loop: data → insight.
The dashboard reads the modeled outputs (/02-model) and DQ logs (/01-clean/_dq).
Tabs:
- Sales (Customers) – top customers & products
- Sales by Country – regional view
- Data Quality – validation history
uv run streamlit run app.pyThen open http://localhost:8501.
- Why visualize locally before BI deployment?
- How does this reflect a real “serving layer”?
- How would you scale this (Metabase, Superset, etc.)?
uv run python view_data.pyShows:
- Head of data
- Schema and datatypes
- Summary stats
💡 Compare a Parquet vs CSV read time.
Edit dq.py:
if (od["Quantity"] > 1000).any():
issues.append("order_details: unusually high quantity (>1000)")Re-run:
uv run python -m etl.run --stage loadInspect dq_issues.parquet for new entries.
Ideas for experimentation:
- Add
Employees.csvand createdim_employee - Aggregate
sales_by_year - Add a time-based chart to Streamlit
- Export results as CSV in
/03-sandbox/
| Mistake | Why it Happens | Fix |
|---|---|---|
| Forgetting to re-run after edits | Cached data | Delete /data/01-clean and /02-model |
| DQ logs overwritten | write_dq_logs() replaces old file |
append or version logs |
| Missing dependencies | env mismatch | use uv sync |
| “Streamlit not found” | ran outside venv | uv run streamlit run app.py |
| File not found | missing extract step | run --stage extract first |
✅ Local ETL pipeline ✅ Cleaned + modeled data ✅ Reproducible structure ✅ Basic governance (DQ + logs) ✅ Working dashboard
| Goal | Tool / Concept |
|---|---|
| Add orchestration | Prefect or Airflow |
| Move data to a DB | DuckDB or Postgres |
| Incremental transforms | dbt |
| Add monitoring | Grafana, Slack alerts |
| Query across layers | Ibis |
| Serve features to ML | Feature stores |
Key Takeaways
- Data engineering = building trust in data.
- Good DE practice = clean, validated, modular pipelines.
- Even pure Python can mirror modern data stacks.
- You’ve built a complete mini stack: CSV → Parquet → Dashboard.
💬 “You don’t need big data to learn data engineering — you just need structure, consistency, and curiosity.”
End of Workshop — Happy Engineering! 🧩