An end-to-end retail data warehouse built on the Medallion Architecture, orchestrated with Apache Airflow, transformed with dbt, and served to Power BI for business intelligence.
Retail operations generate data across multiple touchpoints sales transactions, customer registrations, product returns, inventory movements, staff assignments, and payment settlements. Without a unified, reliable data foundation, answering even basic business questions becomes slow and error-prone:
- Which stores are underperforming, and why?
- Which products have the highest return rates?
- Are we collecting payment on all delivered orders?
- Which items are below reorder level across which stores?
- How has a customer's address or loyalty status changed over time and did we bill them correctly at the time of purchase?
Scattered CSVs, ad-hoc queries, and manual reports cannot answer these questions consistently or at scale. This project solves that by building a production-grade data warehouse that:
- Ingests raw retail data incrementally and safely, without full reloads
- Cleans, validates, and standardizes data through a layered pipeline
- Preserves historical changes to customers and products (SCD Type 2)
- Enforces data quality gates before any layer feeds the next
- Delivers a star schema optimized for BI queries and dashboards
- Architecture Overview
- Tech Stack
- Repository Structure
- Pipeline Orchestration
- Data Layers
- Data Model
- Incremental Loading Strategy
- Data Quality Gates
- Getting Started
- Environment Variables
- Key Design Decisions
┌────────────────────────────────────────────────────────────────────┐
│ Apache Airflow (Orchestrator) │
│ Mon / Wed / Fri — 1:00 PM IST │
└──────┬─────────────────┬─────────────────┬─────────────────┬───────┘
│ │ │ │
extract_data transform_data load_data send_email
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ BRONZE │────►│ SILVER │────►│ GOLD │────► Power BI
│ (Raw) │ │ (Cleaned) │ │ (Star │
└───────────┘ └───────────┘ │ Schema) │
│ └───────────┘
▼
┌───────────┐
│ SNAPSHOTS │
│(SCD Typ2) │
└───────────┘
Data flows from synthetic CSV sources through four pipeline stages. Each stage is independently tested, gated on data quality, and safe to re-run. A failed stage stops the pipeline downstream layers never run against untrusted data.
| Component | Technology |
|---|---|
| Data Generation | Python Faker |
| Data Storage | CSV Files |
| Extraction | Python, pandas |
| Data Warehouse | PostgreSQL |
| Transformation | dbt (data build tool) |
| Orchestration | Apache Airflow |
| Analytics / BI | Power BI |
| Language | Python 3, SQL |
| Containerization | Docker, Docker Compose |
| Version Control | Git, GitHub |
E-com/
├── .git # (Not Committed)
├── .venv # (Not Committed)
├── .vscode # (Not Committed)
├── airflow/
│ ├── config
│ │ └── airflow.cfg # Main airflow configuration file.
│ ├── dags/
│ │ └── pipeline.py # Airflow dag for orchestration.
│ ├── logs # Airflow logs (Not Commited)
│ │
│ └── plugins/ # Extend airflow functionality
│ ├── docker-compose.yaml # Airflow infrastructure orchestration
│ └── .env # Airflow environment variables (Not Commited)
│
├── configs/ # Centralized configuration.
│ ├── __init__.py # Directory behave like a Python package.
│ ├── config.py # Synthetic data generation configuration settings.
│ └── settings.py # Centralized configuration module.
│
├── docs/ # Stores project documentation, reports, and data model explanations.
│ ├── architecture.md
│ ├── data_flow.png
│ └── incremental_loading.md
│
├── generators/ # Generates synthetic retail datasets using Faker.
│ └── run_all.py # Executes all data generators in dependency order.
│
├── logs/ # Separate logs for each ETL stage
│ ├── Extraction
│ │ └── __main___2026-06-01_13-42.log
│ ├── Transformation
│ │ └── __main___2026-06-02_17-25.log
│ └── Loading
│ └── __main___2026-06-05_12-11.log
│
├── notebooks/ # EDA for bronze layer before transformation.
│ └── exploratory_data_analysis.ipynb
│
├── reports/ # Business and data quality reports
│ ├── category_report.md
│ ├── customers.report.md
│ ├── product_report.md
│ └── sales_report.md
│
├── retail_data_warehouse (DBT-CORE for transformation)
│ ├── analyses/ # Analytical SQL queries on Gold-layer models.
│ ├── dbt_packages/ # (Not Committed)
│ ├── logs/ # (Not Committed)
│ ├── macros/ # Custom macro that controls how dbt generates schema names.
│ │ └── generate_schema.sql
│ ├── models # Medallion Architecture Model.
│ │ ├── Bronze
│ │ ├── Silver
│ │ └── Gold
│ ├── seeds/
│ ├── snapshots/ # SCD Type 2 history tracking.
│ │ ├── snap_customers.sql
│ │ └── snap_products.sql
│ ├── target/ # Generated by dbt during execution (Not Committed).
│ ├── test/ # Stores custom data quality tests in dbt.
│ ├── .gitignore
│ ├── dbt_project.yml
│ ├── packages.yml
│ └── README.md
│
├── scripts/ # Stores ETL and automation scripts.
│ ├── extraction
│ │ └── extract.py
│ ├── transformation
│ │ └── transform.py
│ ├── loading
│ │ └── load.py
│ └── utils
│ ├── database_connection.py
│ └── logger.py
│
├── watermark/ # Per-table JSON watermark files + DQ failure reports.
│ ├── <table>.json # One file per Bronze table (e.g. orders.json)
│ ├── transformation/ # Silver DQ failure reports written here on gate failure.
│ └── loading/ # Gold DQ failure reports written here on gate failure.
│
├── .env # Project environment variables (Not Commited)
├── .gitignore
├── .python-version
├── LICENSE
├── main.py
├── pyproject.toml # Stores Python project settings and dependencies
├── README.md
└── uv.lock # (Not Committed)
The pipeline is orchestrated by Apache Airflow with the following configuration:
| Property | Value |
|---|---|
| DAG ID | ecommerce_etl_pipeline |
| Schedule | Mon, Wed, Fri at 1:00 PM IST |
| Cron Expression | 0 13 * * MON,WED,FRI |
| Timezone | Asia/Kolkata |
| Retries | 1 (5-minute delay) |
| On Failure | No email alert |
| On Success | HTML summary email notification |
extract_data ──► transform_data ──► load_data ──► send_email
| Task | Script | Description |
|---|---|---|
extract_data |
scripts/extraction/extract.py |
Incremental upsert of raw data into Bronze |
transform_data |
scripts/transformation/transform.py |
dbt snapshot → silver run → silver test (90% gate) |
load_data |
scripts/loading/load.py |
dbt gold run → gold test (92% gate) |
send_email |
Airflow EmailOperator | HTML success notification |
The Bronze layer is the raw landing zone. Data is loaded from CSV files exactly as generated no renaming, no casting, no filtering. The extractor adds one surrogate column (_etl_id SERIAL) and uses upsert to make every run idempotent.
Source Tables:
| Bronze Table | Source File | Primary Key |
|---|---|---|
bronze.categories |
categories.csv |
id |
bronze.customers |
customers.csv |
id |
bronze.order_items |
order_items.csv |
id |
bronze.orders |
orders.csv |
id |
bronze.payments |
payments.csv |
id |
bronze.product_returns |
product_returns.csv |
id |
bronze.products |
products.csv |
id |
bronze.staffs |
staffs.csv |
id |
bronze.stocks |
stocks.csv |
id |
bronze.stores |
stores.csv |
id |
Row outcomes tracked per run: inserted, updated, skipped, failed. A non-zero failed count exits with code 1 and triggers an Airflow retry.
The Silver layer is managed by transform.py, which runs three dbt steps in strict order:
dbt snapshot ──► dbt run --select silver ──► dbt test --select silver
Silver models use the incremental materialization with a merge strategy and a 3-day lookback buffer to catch late-arriving Bronze rows. Every model applies a full CTE chain:
raw_cleaned— casts IDs and timestamps to prevent type errors in later stepsincremental_filter— applies the 3-day lookback window (skipped on first run)duplicate_check—ROW_NUMBER()partitioned by PK, ordered byupdated_at DESCfixed— keeps onlyrnk = 1email_cleaned— corrects common typos (gmail.con → gmail.com) and nullifies malformed emailsemail_batch_deduped— nullifies emails that appear more than once in a single batch- Final SELECT — applies all business rules and casts to final types
Key transformation rules: type casting, safe date validation, deduplication, null coercion, string standardization (TRIM, LOWER, INITCAP), phone validation (^(91)?[6-9][0-9]{9}$), status validation, and logical date correction.
Two entities change over time — customers and products. dbt snapshot models capture the full history of every change so the Gold layer can reconstruct the exact state of any customer or product at the time a historical transaction occurred.
| Snapshot | Source | Unique Key | Strategy | Hard Deletes |
|---|---|---|---|---|
snap_customers |
silver.customers |
customer_id |
timestamp on updated_at |
Yes |
snap_products |
silver.products |
product_id |
timestamp on updated_at |
Yes |
dbt adds dbt_valid_from, dbt_valid_to, dbt_is_current, dbt_updated_at, and dbt_srid to each snapshot table. invalidate_hard_deletes=True closes a record if its source row disappears from Silver.
Important: Snapshots run before Silver models in each pipeline run. They read Silver as it was at the end of the previous run and capture any changes that accumulated since then.
The Gold layer is managed by load.py, which runs two dbt steps:
dbt run --select gold ──► dbt test --select gold
Gold reads directly from Silver. A failed Silver run must never be followed by a Gold run.
Dimension Tables:
| Table | Grain | Source |
|---|---|---|
dim_customer |
One row per customer | silver.customers |
dim_product |
One row per product | silver.products + silver.categories |
dim_staff |
One row per staff member + sentinel row (id = -1) |
silver.staffs |
dim_store |
One row per store | silver.stores |
Fact Tables:
| Table | Grain | Materialization | Key Sources |
|---|---|---|---|
fct_sales |
One row per order line item | Incremental merge | silver.order_items + silver.orders |
fct_payments |
One row per payment | Incremental merge | silver.payments + silver.orders |
fct_returns |
One row per return | Incremental merge | silver.product_returns + silver.orders |
fct_inventory |
One row per (store, product) | Full refresh | silver.stocks |
dim_customer
│
┌──────────────────┼──────────────────┐
│ │ │
fct_sales fct_payments fct_returns
│ │
┌────┼────┐ │
│ │ │ │
dim_store │ dim_staff dim_product
dim_product │
fct_inventory
┌────┴────┐
dim_store dim_product
Cross-fact relationships:
| Fact A | Fact B | Shared Key | Relationship |
|---|---|---|---|
fct_sales |
fct_returns |
order_item_id |
One-to-one |
fct_sales |
fct_payments |
order_id |
One-to-many |
fct_sales |
fct_inventory |
product_id + store_id |
Many-to-one |
Revenue calculation in fct_sales:
gross_revenue = quantity × unit_price
discount_amount = quantity × unit_price × (discount_percent / 100)
net_revenue = line_total (sourced from Silver, pre-validated)
The pipeline avoids full reloads at every layer. Each layer uses a different mechanism suited to its tooling:
| Layer | Mechanism | Filter Column | Lookback Buffer |
|---|---|---|---|
| Bronze | JSON watermark file per table | updated_at |
None |
| Silver | dbt incremental merge | updated_at |
3 days |
| Gold | Full rebuild from current Silver | — | — |
Bronze watermark file structure:
{
"table": "orders",
"ts_col": "updated_at",
"last_watermark": "2026-05-30T14:22:00",
"last_run": "2026-05-31T05:30:00",
"rows_loaded": 1200
}The watermark is only advanced after at least one row is committed. Zero-row runs leave the watermark unchanged, making the operation fully idempotent.
Silver's 3-day lookback guards against late-arriving records:
WHERE updated_at_clean >=
COALESCE(
(SELECT MAX(updated_at) FROM {{ this }}),
TIMESTAMP '1900-01-01'
) - INTERVAL '3 days'Bypass flags for full reloads:
# Full Bronze reload (ignores watermark)
python -m scripts.extraction.extract datasets/ --full-load
# Full Silver and Gold rebuild
python -m scripts.transformation.transform --full-refresh
python -m scripts.loading.load --full-refreshEvery layer is gated by data quality tests before the next layer runs. The pipeline halts and exits with code 1 if the gate fails.
| Layer | Tests | Pass Threshold | Failure Report |
|---|---|---|---|
| Silver | 200 dbt tests | 90% (PASS + WARN) | watermark/transformation/dq_failure_YYYYMMDD_HHMMSS.json |
| Gold | dbt tests | 92% (PASS + WARN) | watermark/loading/dq_failure_gold_YYYYMMDD_HHMMSS.json |
Gold is held to a stricter threshold (92%) because it is consumed directly by BI tools.
Known acceptable warnings (Silver — 6 documented source gaps):
| Warning | Reason |
|---|---|
not_null_product_returns_return_date |
19 returns missing a return date at source |
not_null_staffs_salary |
2 staff records missing salary at source |
not_null_stores_postal_code |
1 store record missing a postal code at source |
assert_orphan_delivered_orders_without_payments |
34 delivered orders with no payment record |
assert_orphan_orders_without_items |
32 orders with no line items |
assert_rel_order_staff_works_at_order_store |
17 orders where staff belong to a different store |
Threshold by environment:
| Environment | Silver | Gold | How to Set |
|---|---|---|---|
| DEV / CI | 0.85 | 0.85 | DBT_PASS_THRESHOLD=0.85 |
| Staging | 0.90 | 0.90 | DBT_PASS_THRESHOLD=0.90 |
| Production | 0.95 | 0.95 | DBT_PASS_THRESHOLD=0.95 |
- Python 3.11+
- Docker and Docker Compose
- PostgreSQL (or Docker-based instance)
uv(Python package manager)
# 1. Clone the repository
git clone https://github.com/<your-username>/E-com.git
cd E-com
# 2. Install Python dependencies
uv sync
# 3. Copy and configure environment variables
cp .env.example .env
# Edit .env with your PostgreSQL credentials
# 4. Start Airflow via Docker Compose
cd airflow/plugins
docker compose up -d
# 5. Generate synthetic data
python generators/run_all.py# Run the full pipeline end-to-end
python main.py
# Or run each stage individually
python -m scripts.extraction.extract datasets/
python -m scripts.transformation.transform
python -m scripts.loading.loadcd retail_data_warehouse
dbt deps # Install dbt packages
dbt snapshot # Run SCD Type 2 snapshots
dbt run --select silver # Build Silver models
dbt test --select silver # Test Silver (90% gate)
dbt run --select gold # Build Gold models
dbt test --select gold # Test Gold (92% gate)| Variable | Default | Description |
|---|---|---|
ETL_SCHEMA |
bronze |
PostgreSQL schema for Bronze tables |
ETL_TS_COL |
updated_at |
Timestamp column used for watermark filtering |
ETL_PK_COL |
id |
Primary key column for upsert conflict target |
WATERMARK_DIR |
watermark |
Directory for JSON watermark files |
DBT_PASS_THRESHOLD |
0.90 |
Silver DQ gate pass-rate threshold |
DBT_GOLD_PASS_THRESHOLD |
0.92 |
Gold DQ gate pass-rate threshold |
Medallion Architecture — Three layers with single responsibilities. A failure in Silver does not corrupt Bronze. Raw data is always recoverable. Each layer can be independently tested and rerun.
Incremental watermarks — Full table reloads become prohibitively slow as data grows. JSON watermarks keep extraction time proportional to new data volume, not total volume. The --full-load flag handles backfills cleanly.
SCD Type 2 via dbt snapshots — Customers change addresses; products change prices. Without history, a sales fact joined to a current dimension would reflect today's values rather than those at the time of the transaction, corrupting historical analysis.
dbt for transformation — dbt compiles SQL models into a dependency graph, handles execution ordering, and provides built-in testing for nulls, uniqueness, and referential integrity — without a separate orchestration layer for transformation logic.
Data quality gates — Without gates, a pipeline can succeed even when large numbers of tests are failing, causing silent data corruption to flow into BI reports. Gates with machine-readable failure reports allow Airflow callbacks and alerting integrations to react automatically.
Sentinel rows for unresolved FKs — A sentinel row (staff_id = -1) in dim_staff and COALESCE(staff_id, -1) in fct_sales keeps referential integrity intact while making unassigned orders explicitly visible and filterable in reports — rather than breaking joins or producing misleading nulls.
No dim_date — Time context is carried directly as DATE/TIMESTAMP columns on each fact table, keeping the schema lean. A dim_date can be added later if pre-computed fiscal periods, holidays, or week numbering are required.
This project is licensed under the terms of the LICENSE file included in this repository.
Built as a portfolio project demonstrating production-grade data engineering patterns: incremental loading, SCD Type 2, Medallion Architecture, dbt transformations, and automated data quality enforcement.