Great Expectations is a great tool to validate the quality of your data. To get the most out of it, we wanted Great Expectations to be part of our data pipelines. Whenever a pipeline finishes with all of it's transformations, we want to run expectation suites related to that pipeline against the newly transformed data. It turns out that we needed a way to run Great Expectations with a configurable data source and expectation suite(s). For this purpose, we came up with a wrapper that allows us to pass the data source and expectation suites (amongst others).
This project shows how to create such a wrapper so that Great Expectations can be run on a Spark cluster.
The eventual goal is that we can run our Expectation Suites using the following command:
generate_dashboard --pipeline PIPELINE_NAME
For our use case, we only want to load the expectation suites that are related to the pipeline. To easily do this, we are going to structure the way we store the suites.
.
|-- cli.py
|-- great_expectations
| |-- expectations
| | |-- expectation_definitions_1.json
| | |-- expectation_definitions_2.json
| | |-- expectation_definitions_3.json
|-- suites
| |-- PIPELINE_A
| | |-- pipeline_a_suites.yml
| |-- PIPELINE_B
| | |-- pipeline_b_suites.yml
Where the json files are the files that are generated by Great Expectations after you save the Jupyter notebook and yml files look like the following:
# pipeline_a_suites.yml
- suite_name: expectation_definitions_1.json
database_name: some_database
table_name: some_table_1
- suite_name: expectation_definitions_2.json
database_name: some_database
table_name: some_table_2The wrapper is responsible for the following:
- provide the
generate_dashboardcommand - load the correct suites
- create the
DataContext - run the validations
We make use of Poetry and Typer to create the wrapper. Let's create a CLI script that is capable of handling our input parameters. For the sake of simplicity, there is no error handling / reporting.
In order to have some data to run our expectations on, we will create a simple table on EMR. First of, SSH into you cluster and run spark-shell.
Let's start of with creating a table:
spark.sql("CREATE DATABASE great_expectations_demo")Now that the database is created, let's populate it with some data.
val df = Seq(
("Alice", "06123456789"),
("Bob", "06987654321")
).toDF("name", "phone")
df.write.option("path", "s3://bram-bucket-emr/great_expectations_demo.db").saveAsTable("great_expectations_demo.users")To package our wrapper, just run the poetry build command. It will provide a .whl file which can be used to install our wrapper on an EMR cluster.
Copy the file to the EMR cluster with SCP and install it as any other pip package with
pip install PATH_TO_WHEEL --user
Or copy the .whl file to an S3 bucket to which the cluster has access and install it with
s3pip install s3://some-s3-bucket/PATH_TO_WHEEL --user
Now that the table has been created from which we will read the data, and the wrapper is installed on the cluster, it's time to see it in action. Run
generate_dashboard --pipeline pipeline_a --s3-bucket SOME_S3_BUCKET
If no S3 bucket has been provided to the generate_dashboard command, the output can be found on EMR itself.
Go to /home/hadoop/.local/lib/python3.7/site-packages/great_expectations_emr/great_expectations/uncommitted/data_docs/local_site/validations/SUITE_NAME,
where SUITE_NAME corresponds to the suite_name that is provided in pipeline_a_suites.yml