Skip to content

Nitinx12/Retail_data_warehouse

Repository files navigation

Retail Analytics Data Warehouse

An end-to-end retail data warehouse built on the Medallion Architecture, orchestrated with Apache Airflow, transformed with dbt, and served to Power BI for business intelligence.


Business Problem

Retail operations generate data across multiple touchpoints sales transactions, customer registrations, product returns, inventory movements, staff assignments, and payment settlements. Without a unified, reliable data foundation, answering even basic business questions becomes slow and error-prone:

  • Which stores are underperforming, and why?
  • Which products have the highest return rates?
  • Are we collecting payment on all delivered orders?
  • Which items are below reorder level across which stores?
  • How has a customer's address or loyalty status changed over time and did we bill them correctly at the time of purchase?

Scattered CSVs, ad-hoc queries, and manual reports cannot answer these questions consistently or at scale. This project solves that by building a production-grade data warehouse that:

  • Ingests raw retail data incrementally and safely, without full reloads
  • Cleans, validates, and standardizes data through a layered pipeline
  • Preserves historical changes to customers and products (SCD Type 2)
  • Enforces data quality gates before any layer feeds the next
  • Delivers a star schema optimized for BI queries and dashboards

Table of Contents

  1. Architecture Overview
  2. Tech Stack
  3. Repository Structure
  4. Pipeline Orchestration
  5. Data Layers
  6. Data Model
  7. Incremental Loading Strategy
  8. Data Quality Gates
  9. Getting Started
  10. Environment Variables
  11. Key Design Decisions

Architecture Overview

┌────────────────────────────────────────────────────────────────────┐
│                   Apache Airflow (Orchestrator)                    │
│                   Mon / Wed / Fri — 1:00 PM IST                    │
└──────┬─────────────────┬─────────────────┬─────────────────┬───────┘
       │                 │                 │                 │
 extract_data      transform_data      load_data         send_email
       │                 │                 │
       ▼                 ▼                 ▼
 ┌───────────┐     ┌───────────┐     ┌───────────┐
 │  BRONZE   │────►│  SILVER   │────►│   GOLD    │────► Power BI
 │   (Raw)   │     │ (Cleaned) │     │  (Star    │
 └───────────┘     └───────────┘     │  Schema)  │
                         │           └───────────┘
                         ▼
                   ┌───────────┐
                   │ SNAPSHOTS │
                   │(SCD Typ2) │
                   └───────────┘

Data flows from synthetic CSV sources through four pipeline stages. Each stage is independently tested, gated on data quality, and safe to re-run. A failed stage stops the pipeline downstream layers never run against untrusted data.

image

Tech Stack

Component Technology
Data Generation Python Faker
Data Storage CSV Files
Extraction Python, pandas
Data Warehouse PostgreSQL
Transformation dbt (data build tool)
Orchestration Apache Airflow
Analytics / BI Power BI
Language Python 3, SQL
Containerization Docker, Docker Compose
Version Control Git, GitHub

Repository Structure

E-com/
├── .git                                                    # (Not Committed)
├── .venv                                                   # (Not Committed)
├── .vscode                                                 # (Not Committed)
├── airflow/
│     ├── config
│     │      └── airflow.cfg                                # Main airflow configuration file.
│     ├── dags/
│     │      └── pipeline.py                                # Airflow dag for orchestration.
│     ├── logs                                              # Airflow logs (Not Commited)
│     │
│     └── plugins/                                          # Extend airflow functionality
│           ├── docker-compose.yaml                         # Airflow infrastructure orchestration
│           └── .env                                        # Airflow environment variables (Not Commited)
│
├── configs/                                                # Centralized configuration.
│     ├── __init__.py                                       # Directory behave like a Python package.
│     ├── config.py                                         # Synthetic data generation configuration settings.
│     └── settings.py                                       # Centralized configuration module.
│
├── docs/                                                   # Stores project documentation, reports, and data model explanations.
│     ├── architecture.md
│     ├── data_flow.png
│     └── incremental_loading.md
│
├── generators/                                             # Generates synthetic retail datasets using Faker.
│       └── run_all.py                                      # Executes all data generators in dependency order.
│
├── logs/                                                   # Separate logs for each ETL stage
│     ├── Extraction
│     │      └── __main___2026-06-01_13-42.log
│     ├── Transformation
│     │      └── __main___2026-06-02_17-25.log
│     └── Loading
│             └── __main___2026-06-05_12-11.log
│
├── notebooks/                                              # EDA for bronze layer before transformation.
│     └── exploratory_data_analysis.ipynb
│
├── reports/                                                # Business and data quality reports
│     ├── category_report.md
│     ├── customers.report.md
│     ├── product_report.md
│     └── sales_report.md
│
├── retail_data_warehouse (DBT-CORE for transformation)
│     ├── analyses/                                         # Analytical SQL queries on Gold-layer models.
│     ├── dbt_packages/                                     # (Not Committed)
│     ├── logs/                                             # (Not Committed)
│     ├── macros/                                           # Custom macro that controls how dbt generates schema names.
│     │     └── generate_schema.sql
│     ├── models                                            # Medallion Architecture Model.
│     │     ├── Bronze
│     │     ├── Silver
│     │     └── Gold
│     ├── seeds/
│     ├── snapshots/                                        # SCD Type 2 history tracking.
│     │      ├── snap_customers.sql
│     │      └── snap_products.sql
│     ├── target/                                           # Generated by dbt during execution (Not Committed).
│     ├── test/                                             # Stores custom data quality tests in dbt.
│     ├── .gitignore
│     ├── dbt_project.yml
│     ├── packages.yml
│     └── README.md
│
├── scripts/                                                # Stores ETL and automation scripts.
│     ├── extraction
│     │      └── extract.py
│     ├── transformation
│     │      └── transform.py
│     ├── loading
│     │      └── load.py
│     └── utils
│            ├── database_connection.py
│            └── logger.py
│
├── watermark/                                              # Per-table JSON watermark files + DQ failure reports.
│     ├── <table>.json                                      # One file per Bronze table (e.g. orders.json)
│     ├── transformation/                                   # Silver DQ failure reports written here on gate failure.
│     └── loading/                                         # Gold DQ failure reports written here on gate failure.
│
├── .env                                                    # Project environment variables (Not Commited)
├── .gitignore
├── .python-version
├── LICENSE
├── main.py
├── pyproject.toml                                          # Stores Python project settings and dependencies
├── README.md
└── uv.lock                                                 # (Not Committed)                  

Pipeline Orchestration

The pipeline is orchestrated by Apache Airflow with the following configuration:

Property Value
DAG ID ecommerce_etl_pipeline
Schedule Mon, Wed, Fri at 1:00 PM IST
Cron Expression 0 13 * * MON,WED,FRI
Timezone Asia/Kolkata
Retries 1 (5-minute delay)
On Failure No email alert
On Success HTML summary email notification

Task Flow

extract_data  ──►  transform_data  ──►  load_data  ──►  send_email
Task Script Description
extract_data scripts/extraction/extract.py Incremental upsert of raw data into Bronze
transform_data scripts/transformation/transform.py dbt snapshot → silver run → silver test (90% gate)
load_data scripts/loading/load.py dbt gold run → gold test (92% gate)
send_email Airflow EmailOperator HTML success notification

Data Layers

Bronze — Raw Ingestion

The Bronze layer is the raw landing zone. Data is loaded from CSV files exactly as generated no renaming, no casting, no filtering. The extractor adds one surrogate column (_etl_id SERIAL) and uses upsert to make every run idempotent.

Source Tables:

Bronze Table Source File Primary Key
bronze.categories categories.csv id
bronze.customers customers.csv id
bronze.order_items order_items.csv id
bronze.orders orders.csv id
bronze.payments payments.csv id
bronze.product_returns product_returns.csv id
bronze.products products.csv id
bronze.staffs staffs.csv id
bronze.stocks stocks.csv id
bronze.stores stores.csv id

Row outcomes tracked per run: inserted, updated, skipped, failed. A non-zero failed count exits with code 1 and triggers an Airflow retry.


Silver — Cleaned and Standardized

The Silver layer is managed by transform.py, which runs three dbt steps in strict order:

dbt snapshot  ──►  dbt run --select silver  ──►  dbt test --select silver

Silver models use the incremental materialization with a merge strategy and a 3-day lookback buffer to catch late-arriving Bronze rows. Every model applies a full CTE chain:

  1. raw_cleaned — casts IDs and timestamps to prevent type errors in later steps
  2. incremental_filter — applies the 3-day lookback window (skipped on first run)
  3. duplicate_checkROW_NUMBER() partitioned by PK, ordered by updated_at DESC
  4. fixed — keeps only rnk = 1
  5. email_cleaned — corrects common typos (gmail.con → gmail.com) and nullifies malformed emails
  6. email_batch_deduped — nullifies emails that appear more than once in a single batch
  7. Final SELECT — applies all business rules and casts to final types

Key transformation rules: type casting, safe date validation, deduplication, null coercion, string standardization (TRIM, LOWER, INITCAP), phone validation (^(91)?[6-9][0-9]{9}$), status validation, and logical date correction.


Snapshots — SCD Type 2

Two entities change over time — customers and products. dbt snapshot models capture the full history of every change so the Gold layer can reconstruct the exact state of any customer or product at the time a historical transaction occurred.

Snapshot Source Unique Key Strategy Hard Deletes
snap_customers silver.customers customer_id timestamp on updated_at Yes
snap_products silver.products product_id timestamp on updated_at Yes

dbt adds dbt_valid_from, dbt_valid_to, dbt_is_current, dbt_updated_at, and dbt_srid to each snapshot table. invalidate_hard_deletes=True closes a record if its source row disappears from Silver.

Important: Snapshots run before Silver models in each pipeline run. They read Silver as it was at the end of the previous run and capture any changes that accumulated since then.


Gold — Star Schema

The Gold layer is managed by load.py, which runs two dbt steps:

dbt run --select gold  ──►  dbt test --select gold

Gold reads directly from Silver. A failed Silver run must never be followed by a Gold run.

Dimension Tables:

Table Grain Source
dim_customer One row per customer silver.customers
dim_product One row per product silver.products + silver.categories
dim_staff One row per staff member + sentinel row (id = -1) silver.staffs
dim_store One row per store silver.stores

Fact Tables:

Table Grain Materialization Key Sources
fct_sales One row per order line item Incremental merge silver.order_items + silver.orders
fct_payments One row per payment Incremental merge silver.payments + silver.orders
fct_returns One row per return Incremental merge silver.product_returns + silver.orders
fct_inventory One row per (store, product) Full refresh silver.stocks

Data Model

                        dim_customer
                             │
          ┌──────────────────┼──────────────────┐
          │                  │                  │
       fct_sales        fct_payments        fct_returns
          │                                     │
     ┌────┼────┐                                │
     │    │    │                                │
dim_store │  dim_staff                     dim_product
       dim_product                              │
                                         fct_inventory
                                          ┌────┴────┐
                                       dim_store  dim_product

Cross-fact relationships:

Fact A Fact B Shared Key Relationship
fct_sales fct_returns order_item_id One-to-one
fct_sales fct_payments order_id One-to-many
fct_sales fct_inventory product_id + store_id Many-to-one

Revenue calculation in fct_sales:

gross_revenue   = quantity × unit_price
discount_amount = quantity × unit_price × (discount_percent / 100)
net_revenue     = line_total (sourced from Silver, pre-validated)

Incremental Loading Strategy

The pipeline avoids full reloads at every layer. Each layer uses a different mechanism suited to its tooling:

Layer Mechanism Filter Column Lookback Buffer
Bronze JSON watermark file per table updated_at None
Silver dbt incremental merge updated_at 3 days
Gold Full rebuild from current Silver

Bronze watermark file structure:

{
    "table":          "orders",
    "ts_col":         "updated_at",
    "last_watermark": "2026-05-30T14:22:00",
    "last_run":       "2026-05-31T05:30:00",
    "rows_loaded":    1200
}

The watermark is only advanced after at least one row is committed. Zero-row runs leave the watermark unchanged, making the operation fully idempotent.

Silver's 3-day lookback guards against late-arriving records:

WHERE updated_at_clean >=
    COALESCE(
        (SELECT MAX(updated_at) FROM {{ this }}),
        TIMESTAMP '1900-01-01'
    ) - INTERVAL '3 days'

Bypass flags for full reloads:

# Full Bronze reload (ignores watermark)
python -m scripts.extraction.extract datasets/ --full-load

# Full Silver and Gold rebuild
python -m scripts.transformation.transform --full-refresh
python -m scripts.loading.load --full-refresh

Data Quality Gates

Every layer is gated by data quality tests before the next layer runs. The pipeline halts and exits with code 1 if the gate fails.

Layer Tests Pass Threshold Failure Report
Silver 200 dbt tests 90% (PASS + WARN) watermark/transformation/dq_failure_YYYYMMDD_HHMMSS.json
Gold dbt tests 92% (PASS + WARN) watermark/loading/dq_failure_gold_YYYYMMDD_HHMMSS.json

Gold is held to a stricter threshold (92%) because it is consumed directly by BI tools.

Known acceptable warnings (Silver — 6 documented source gaps):

Warning Reason
not_null_product_returns_return_date 19 returns missing a return date at source
not_null_staffs_salary 2 staff records missing salary at source
not_null_stores_postal_code 1 store record missing a postal code at source
assert_orphan_delivered_orders_without_payments 34 delivered orders with no payment record
assert_orphan_orders_without_items 32 orders with no line items
assert_rel_order_staff_works_at_order_store 17 orders where staff belong to a different store

Threshold by environment:

Environment Silver Gold How to Set
DEV / CI 0.85 0.85 DBT_PASS_THRESHOLD=0.85
Staging 0.90 0.90 DBT_PASS_THRESHOLD=0.90
Production 0.95 0.95 DBT_PASS_THRESHOLD=0.95

Getting Started

Prerequisites

  • Python 3.11+
  • Docker and Docker Compose
  • PostgreSQL (or Docker-based instance)
  • uv (Python package manager)

Installation

# 1. Clone the repository
git clone https://github.com/<your-username>/E-com.git
cd E-com

# 2. Install Python dependencies
uv sync

# 3. Copy and configure environment variables
cp .env.example .env
# Edit .env with your PostgreSQL credentials

# 4. Start Airflow via Docker Compose
cd airflow/plugins
docker compose up -d

# 5. Generate synthetic data
python generators/run_all.py

Running the Pipeline Manually

# Run the full pipeline end-to-end
python main.py

# Or run each stage individually
python -m scripts.extraction.extract datasets/
python -m scripts.transformation.transform
python -m scripts.loading.load

Running dbt Directly

cd retail_data_warehouse

dbt deps                        # Install dbt packages
dbt snapshot                    # Run SCD Type 2 snapshots
dbt run --select silver         # Build Silver models
dbt test --select silver        # Test Silver (90% gate)
dbt run --select gold           # Build Gold models
dbt test --select gold          # Test Gold (92% gate)

Environment Variables

Variable Default Description
ETL_SCHEMA bronze PostgreSQL schema for Bronze tables
ETL_TS_COL updated_at Timestamp column used for watermark filtering
ETL_PK_COL id Primary key column for upsert conflict target
WATERMARK_DIR watermark Directory for JSON watermark files
DBT_PASS_THRESHOLD 0.90 Silver DQ gate pass-rate threshold
DBT_GOLD_PASS_THRESHOLD 0.92 Gold DQ gate pass-rate threshold

Key Design Decisions

Medallion Architecture — Three layers with single responsibilities. A failure in Silver does not corrupt Bronze. Raw data is always recoverable. Each layer can be independently tested and rerun.

Incremental watermarks — Full table reloads become prohibitively slow as data grows. JSON watermarks keep extraction time proportional to new data volume, not total volume. The --full-load flag handles backfills cleanly.

SCD Type 2 via dbt snapshots — Customers change addresses; products change prices. Without history, a sales fact joined to a current dimension would reflect today's values rather than those at the time of the transaction, corrupting historical analysis.

dbt for transformation — dbt compiles SQL models into a dependency graph, handles execution ordering, and provides built-in testing for nulls, uniqueness, and referential integrity — without a separate orchestration layer for transformation logic.

Data quality gates — Without gates, a pipeline can succeed even when large numbers of tests are failing, causing silent data corruption to flow into BI reports. Gates with machine-readable failure reports allow Airflow callbacks and alerting integrations to react automatically.

Sentinel rows for unresolved FKs — A sentinel row (staff_id = -1) in dim_staff and COALESCE(staff_id, -1) in fct_sales keeps referential integrity intact while making unassigned orders explicitly visible and filterable in reports — rather than breaking joins or producing misleading nulls.

No dim_date — Time context is carried directly as DATE/TIMESTAMP columns on each fact table, keeping the schema lean. A dim_date can be added later if pre-computed fiscal periods, holidays, or week numbering are required.


License

This project is licensed under the terms of the LICENSE file included in this repository.


Built as a portfolio project demonstrating production-grade data engineering patterns: incremental loading, SCD Type 2, Medallion Architecture, dbt transformations, and automated data quality enforcement.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors