this project is end-to-end ELT Pipeline that extract data from reddit top Egyptian subreddits and store it doing some some cleaning and transfrom it to be easier to work with
├── airflow.env
├── dags
│ └── reddit_dag.py # DAG definition for the Reddit pipeline
├── docker-compose.yml # Docker services configuration
├── Dockerfile # Custom Airflow image build file
├── elts
│ └── reddit_elt.py # Extract, Load, and Transform funcitons
├── pipelines
│ └── reddit_pipeline.py # Connects ELT with Airflow DAG
├── requirements.txt
├── utils
| ├── connections.py # Connect to APIs and DataBases
| └── constants.py
└── work_flow_diagram.png
- Python 3.12.3 for scription
- Apache Airflow 2.7.1 for Orchestration
- MongoDB 6 To Load data
- PRAW To use Reddi API
- Docker & Docker Compose for Infrastructure
-
Post Extraction: Fetches top daily posts from subreddits like
r/Egypt,r/CAIRO, andr/Masr. -
Smart Sync: It queries MongoDB to identify "Active Posts" (posts created within the last 30 days). It only fetches new comments if the post hasn't been synced in the last 24 hours, optimizing API rate limits.
-
Atomic Upserts: Uses MongoDB
UpdateOnewithupsert=Trueto prevent duplicate records and update existing post scores. -
Data Lineage: Every record is timestamped with
ingested_atandlast_sync_utcto track data freshness and audit the pipeline.
-
Merging: Joins posts and comments into a unified
processed_reddit_datacollection. -
Optimization: A specialized pipeline unsets 20+ redundant metadata fields and filters out
nullvalues, significantly reducing the storage footprint and improving query speed for BI tools.
Configure your /config/config.conf file with your Reddit API and Mongo credentials:
[mongodb]
mongo_uri = mongodb://mongodb:27017
mongo_db = reddit_db
raw_collection = raw_posts
clean_collection = clean_posts
[api_keys]
reddit_secret_key = <your secret key>
reddit_client_id = <your clieng id>
user_agent = <your user agent>
This step is useful for local development, testing, or understanding dependencies before running the pipeline using Docker.
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
Run the following command to build the custom Airflow image (optimized with Layer Caching) and start all services:
docker-compose up -d --build
-
Access the Airflow Webserver at Airflow UI.
-
Login with:
username: airflow
password: airflow
- Enable the
reddit_pipeline_dagto trigger the first run.
-
Raw Layer:
raw_posts,raw_comments(Minimal processing). -
Analytical Layer:
final_analytics(Cleaned, Merged, and Optimized for queries).
If you want to test something and delete MongDB sync from Shell or Compass Do:
db.raw_posts.updateMany({}, { $unset: { last_comments_sync_utc: "" } })
Contributions are welcome! Please open issues or submit pull requests for improvements or bug fixes.
