This project implements an ETL (Extract, Transform, Load) pipeline using Apache Airflow to fetch book data from Google Books API and store it in a PostgreSQL database.
The pipeline performs the following operations:
- Extract: Fetches data engineering books from Google Books API
- Transform: Cleans and deduplicates the fetched book data
- Load: Stores the processed data into a PostgreSQL database table
GOOGLE_BOOKS_ETL/
βββ dags/
β βββ dag.py # Main Airflow DAG definition
βββ config/
β βββ airflow.cfg # Airflow configuration file
βββ plugins/ # Custom Airflow plugins (currently empty)
βββ logs/ # Airflow execution logs
βββ docker-compose.yaml # Docker Compose configuration for Airflow stack
βββ README.md # This file
The main DAG file that defines the ETL workflow. It contains:
get_google_books_data(): Fetches book data from Google Books API- Retrieves book titles, authors, prices, and ratings using Google Books API
- Handles pagination to fetch multiple pages
- Removes duplicates based on book titles
- Stores data in XCom for task communication
create_books_table(): Creates thebookstable in PostgreSQL if it doesn't exist- Table schema:
id,title,author,price,rating
- Table schema:
insert_book_data_into_postgres(): Loads the extracted data into PostgreSQL- Retrieves data from XCom
- Inserts records into the
bookstable
DAG Configuration:
- DAG ID:
fetch_and_store_google_books - Schedule: Runs daily (
timedelta(days=1)) - Tasks:
fetch_book_dataβ Fetches 50 books by defaultcreate_tableβ Creates PostgreSQL tableinsert_book_dataβ Inserts data into database
Docker Compose configuration that sets up the complete Airflow stack:
- PostgreSQL: Database for Airflow metadata and book data
- Redis: Message broker for Celery executor
- Airflow Services:
airflow-apiserver: REST API server (port 8080)airflow-scheduler: Schedules and monitors DAGsairflow-dag-processor: Processes DAG filesairflow-worker: Executes tasks using Celeryairflow-triggerer: Handles deferred tasksairflow-init: Initializes Airflow database and creates admin user
- pgAdmin: PostgreSQL administration tool (port 5050)
- Flower: Celery monitoring tool (optional, port 5555)
Airflow configuration file containing settings for:
- DAG folder paths
- Executor configuration
- Database connections
- Logging settings
- Security settings
- And other Airflow core configurations
Directory for custom Airflow plugins. Currently empty but can be used to add:
- Custom operators
- Custom hooks
- Custom sensors
- Custom executors
Directory where Airflow stores execution logs for all DAG runs and tasks.
Before running this project, ensure you have:
-
Docker Desktop installed and running
- Download from: https://www.docker.com/products/docker-desktop
- Ensure Docker has at least:
- 4GB RAM
- 2 CPUs
- 10GB disk space
-
Git (optional, for cloning the repository)
cd GOOGLE_BOOKS_ETLCreate a .env file in the project root if you want to customize settings:
AIRFLOW_UID=50000
AIRFLOW_IMAGE_NAME=apache/airflow:3.1.1
AIRFLOW_PROJ_DIR=.
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflowOn Linux/Mac, set the Airflow user ID:
echo -e "AIRFLOW_UID=$(id -u)" > .envOn Windows, you can skip this step or manually create .env with:
AIRFLOW_UID=50000
docker-compose up -dThis command will:
- Pull required Docker images (first time only)
- Start all services (PostgreSQL, Redis, Airflow components)
- Initialize the Airflow database
- Create an admin user (username:
airflow, password:airflow)
Wait for all services to be healthy (usually takes 2-3 minutes). Check status:
docker-compose psAll services should show as "healthy" or "running".
Open your browser and navigate to:
- Airflow Web UI: http://localhost:8080
- Login credentials:
- Username:
airflow - Password:
airflow
- Username:
Before running the DAG, you need to set up the PostgreSQL connection:
- In Airflow UI, go to Admin β Connections
- Click + to add a new connection
- Configure as follows:
- Connection Id:
books_connection - Connection Type:
Postgres - Host:
postgres - Schema:
airflow(or create a new database) - Login:
airflow - Password:
airflow - Port:
5432
- Connection Id:
- Click Save
Alternative: Create a separate database for books
If you want to use a separate database for books:
- Connect to PostgreSQL using pgAdmin (http://localhost:5050) or Docker:
docker-compose exec postgres psql -U airflow -d airflow - Create a new database:
CREATE DATABASE books_db;
- Update the connection in Airflow to use
books_dbas the schema
- In Airflow UI, find the DAG
fetch_and_store_google_books - Toggle it ON (unpause) using the switch on the left
- Click on the DAG name to view details
- Click the Play button (βΆ) to trigger a manual run, or wait for the scheduled run
-
Via Airflow UI:
- Go to the DAG page
- Click Trigger DAG (play button)
-
Via Command Line:
docker-compose exec airflow-apiserver airflow dags trigger fetch_and_store_google_books
The DAG is configured to run automatically every day. No action needed - it will execute based on the schedule.
- Airflow UI: View DAG runs, task status, and logs
- Logs: Check task logs in the Airflow UI or in the
logs/directory - Database: Query the
bookstable to see loaded data
Option 1: Using pgAdmin
- Open http://localhost:5050
- Login with:
- Email:
admin@admin.com - Password:
admin
- Email:
- Add server:
- Host:
postgres - Port:
5432 - Database:
airflow(orbooks_dbif created) - Username:
airflow - Password:
airflow
- Host:
- Navigate to the
bookstable and view data
Option 2: Using Docker CLI
docker-compose exec postgres psql -U airflow -d airflow -c "SELECT * FROM books LIMIT 10;"docker-compose downdocker-compose down -v# All services
docker-compose logs -f
# Specific service
docker-compose logs -f airflow-scheduler
docker-compose logs -f airflow-workerdocker-compose restartdocker-compose psdocker-compose exec airflow-apiserver airflow version
docker-compose exec airflow-apiserver airflow dags listEdit dags/dag.py, find the fetch_book_data_task definition:
op_args=[50], # Change 50 to desired numberEdit dags/dag.py, find the dag = DAG(...) definition:
schedule=timedelta(days=1), # Change schedule as neededUpdate the connection in Airflow UI (Admin β Connections) or modify the connection ID 'books_connection' in dag.py (in create_books_table() and insert_book_data_into_postgres() functions).
- Check if DAG file has syntax errors:
docker-compose exec airflow-apiserver airflow dags list-import-errors - Ensure DAG is in the
dags/folder - Check logs:
docker-compose logs airflow-dag-processor
- Verify PostgreSQL connection is configured correctly in Airflow
- Check if PostgreSQL service is running:
docker-compose ps postgres - Test connection:
docker-compose exec postgres pg_isready -U airflow
- Check task logs in Airflow UI
- Verify internet connection (for API calls)
- Check if Google Books API is accessible (verify API status)
- Change ports in
docker-compose.yamlif 8080 or 5050 are already in use - Update port mappings:
"8081:8080"for Airflow,"5051:80"for pgAdmin
The project uses the following Python packages (included in Airflow image):
apache-airflow: Workflow orchestrationpandas: Data manipulationrequests: HTTP requests for API callspsycopg2: PostgreSQL adapterapache-airflow-providers-postgres: PostgreSQL provider for Airflow
- Changing default passwords
- Setting up proper authentication
- Using environment variables for sensitive data
- Implementing proper network security
- Using HTTPS
- Setting up proper backup strategies
This project uses Apache Airflow, which is licensed under the Apache License 2.0.
Feel free to submit issues, fork the repository, and create pull requests for any improvements.
For issues or questions:
- Check the troubleshooting section above
- Review Airflow documentation: https://airflow.apache.org/docs/
- Check Docker logs for detailed error messages
Happy ETL-ing! π
