-
Open up a terminal and create directory for airflow.
cd ~ mkdir airflow_standalone cd airflow_standalone
-
Install and activate virtual environment.
pip install virtualenv python -m venv venv source venv/bin/activate -
Install airflow.
pip install apache-airflow
-
Set AIRFLOW_HOME environment variable, where Airflow will store config and log files.
(venv) export AIRFLOW_HOME=~/airflow_standalone/airflow_home
-
Create SQLite metadata database, which also create
airflow.cfgandunittests.cfg.(venv) airflow initdb
-
Start Airflow webserver on port 8080 and then you can visit localhost:8080.
(venv) airflow webserver --port 8080
-
Open another terminal window and start scheduler.
cd ~/airflow_standalone source venv/bin/activate (venv) airflow scheduler
-
If you get
ValueError: unknown locale: UTF-8when trying to run example dags, you need to setLC_ALLvariable. Read more on github.(venv) export LC_ALL=C -
To stop all processes of webserver and scheduler, just kill them in third terminal window.
kill -9 `ps aux | grep airflow | awk '{print $2}'`
A music streaming company, Sparkify, has decided that it is time to introduce more automation and monitoring to their data warehouse ETL pipelines and come to the conclusion that the best tool to achieve this is Apache Airflow. The source data resides in S3 and needs to be processed in Sparkify's data warehouse in Amazon Redshift. The source datasets consist of JSON logs that tell about user activity in the application and JSON metadata about the songs the users listen to.
s3://udacity-dend/song_data s3://udacity-dend/log_data
The first dataset is a subset of real data from the Million Song Dataset. Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song's track ID.
song_data/A/A/B/TRAABJL12903CDCF1A.json
{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}The second dataset consists of log files in JSON format generated by this event simulator based on the songs in the dataset above. These simulate app activity logs from an imaginary music streaming app based on configuration settings. The log files in the dataset you'll be working with are partitioned by year and month. For example, here are filepaths to two files in this dataset.
log_data/2018/11/2018-11-01-events.json
{"artist":null,"auth":"Logged In","firstName":"Walter","gender":"M","itemInSession":0,"lastName":"Frye","length":null,"level":"free","location":"San Francisco-Oakland-Hayward, CA","method":"GET","page":"Home","registration":1540919166796.0,"sessionId":38,"song":null,"status":200,"ts":1541105830796,"userAgent":"\"Mozilla\/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/36.0.1985.143 Safari\/537.36\"","userId":"39"}Using Apache Airflow we should build Data pipelines that are be dynamic and built from reusable tasks, can be monitored, and allow easy backfills. Data quality plays a big part when analyses are executed on top the data warehouse and tests need to be run against their datasets after the ETL steps have been executed to catch any discrepancies in the datasets.
- The DAG does not have dependencies on past runs;
- On failure, the task are retried 3 times;
- Retries happen every 5 minutes;
- Catchup is turned off;
- Do not email on retry.
- Stage Operator - Load JSON files from S3 to Amazon Redshift
- Runs a SQL COPY.
- Fact and Dimension Operators - SQL helper class to run data transformations.
- Take target database on which to run the query against
- Data Quality Operator - Run checks on the data itself
- Certain column contains NULL.
project
|-- dags
|-- etl.py
|-- plugins
|-- __init__.py
|-- helpers
|-- __init__.py
|-- sql_queries.py
|-- operators
|-- __init__.py
|-- data_quality.py
|-- load_dimension.py
|-- load_fact.py
|-- stage_redshift.py
|-- create_tables.sql
|-- README.md
|-- .gitignore
- Software:
-
Running installation of Airflow (follow steps at the top of README)
-
AWS account:
- Python 3rd party libraries (make sure you have virtual environment activated):
(venv) pip install boto3
(venv) pip install botocore
(venv) pip install psycopg2- Clone or download it.
- Make sure you met all the requirements listed above.
- Create
redshiftandaws_credentialsin Airflow connections tab - Run
create_table.sqlscript and create tables. (It can be done via query editor) - Open localhost:8080 and run
main_etlDAG.