In this sample, we'll build an BigQuery processing pipeline to query some public dataset on a schedule, create charts out of the data and then notify users about the new charts via SendGrid with Knative Eventing on GKE.
- Two
CloudSchedulerSourcesare setup to call theQueryRunnerservice once a day for two countries. QueryRunnerreceives the scheduler event for both country, queries Covid-19 cases for the country using BigQuery's public Covid-19 dataset and saves the result in a separate BigQuery table. Once done,QueryRunnerreturns a customCloudEventof typedev.knative.samples.querycompleted.ChartCreatorreceives thequerycompletedevent, creates a chart from BigQuery data usingmathplotliband saves it to a Cloud Storage bucket.Notifierreceives thecom.google.cloud.storage.object.finalizeevent from the bucket via aCloudStorageSourceand sends an email notification to users using SendGrid.
We're assuming that you already went through Cloud Storage triggered service tutorial where you setup Knative with GCP & PubSub Topic and also initialized Cloud Storage with Pub/Sub events. Here we will start with creating buckets for the pipeline.
Create a unique storage bucket to save the charts and make sure the bucket and the charts in the bucket are all public:
export BUCKET="$(gcloud config get-value core/project)-charts"
gsutil mb gs://$BUCKET
gsutil uniformbucketlevelaccess set on gs://${BUCKET}
gsutil iam ch allUsers:objectViewer gs://${BUCKET}Create 2 CloudSchedulerSources to create Cloud Scheduler jobs to call
QueryRunner for 2 different countries (United Kingdom and Cyprus in this
case).
schedulersource-uk.yaml
and
schedulersource-cy.yaml
defines the CloudSchedulerSources with the schedule and country information
(under data field).
Create them:
kubectl apply -f schedulersource-uk.yaml -f schedulersource-cy.yamlCheck that they are running:
kubectl get cloudschedulersource
NAME READY
schedulersource-cy True
schedulersource-uk TrueYou should also see CloudScheduler jobs created:
gcloud scheduler jobs list
ID LOCATION SCHEDULE (TZ) TARGET_TYPE STATE
cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2 europe-west1 0 16 * * * (UTC) Pub/Sub ENABLED
cre-scheduler-714c0b82-c441-42f4-8f99-0e2eac9a5869 europe-west1 0 17 * * * (UTC) Pub/Sub ENABLEDCreate a CloudStorageSource to connect storage events from the bucket where
the charts will be saved to the Broker in Knative Eventing.
storagesource-charts.yaml
defines the CloudStorageSource. Make sure you update the bucket name to the
actual bucket name in your project.
Create it:
kubectl apply -f storagesource-charts.yamlCheck that it is running:
kubectl get cloudstoragesource
NAME READY
storagesource-charts TrueMake sure there's a Broker in the default namespace by following instructions in Broker Creation page.
This service receives Cloud Scheduler events for each country. It uses BigQuery API to query for the public Covid19 dataset for those countries. Once done, it saves the results to a new BigQuery table and passes a custom event onwards.
The code of the service is in query-runner folder.
Inside the top level processing-pipelines folder, build and push the container image:
export SERVICE_NAME=query-runner
docker build -t meteatamel/${SERVICE_NAME}:v1 -f bigquery/${SERVICE_NAME}/csharp/Dockerfile .
docker push meteatamel/${SERVICE_NAME}:v1Create the service defined in
kservice.yaml.
Make sure you update the PROJECT_ID with your actual project id. This is
needed for the BigQuery client.
kubectl apply -f kservice.yamlThe trigger of the service filters on Cloud Scheduler execute events:
com.google.cloud.scheduler.job.execute.
Create the trigger for the service defined in trigger.yaml:
kubectl apply -f trigger.yamlThis service receives the custom event from Query Runner, queries the BigQuery
table for the requested country and creates a chart out of the data using
mathplotlib library. Finally, the chart is uploaded to a public bucket in
Cloud Storage.
The code of the service is in chart-creator folder.
Inside the chart-creator/python folder, build and push the container image:
export SERVICE_NAME=chart-creator
docker build -t meteatamel/${SERVICE_NAME}:v1 .
docker push meteatamel/${SERVICE_NAME}:v1Create the service defined in
kservice.yaml.
Make sure you update the BUCKET env variable to the bucket name you created
earlier.
kubectl apply -f kservice.yamlThe trigger of the service filters on ddev.knative.samples.querycompleted event
types which is the custom event type emitted by the query service.
Create the trigger for the service defined in trigger.yaml:
kubectl apply -f trigger.yamlThis service receives the Cloud Storage events from CloudStorageSource and
uses SendGrid to send an email to users that a new chart has been created. You
need to setup a SendGrid account and create an API key. You can follow this
doc
for more details on how to setup SendGrid.
The code of the service is in notifier folder.
Inside the notifier/python folder, build and push the container image:
export SERVICE_NAME=notifier
docker build -t meteatamel/${SERVICE_NAME}:v1 .
docker push meteatamel/${SERVICE_NAME}:v1Create the service defined in
kservice.yaml.
Make sure you update TO_EMAILS and SENDGRID_API_KEY accordingly.
kubectl apply -f kservice.yamlThe trigger of the service filters on com.google.cloud.storage.object.finalize event
which is the event type emitted by the Cloud Storage when a file is saved to the
bucket.
Create the trigger for the service defined in trigger.yaml:
kubectl apply -f trigger.yamlBefore testing the pipeline, make sure all the triggers are ready:
kubectl get trigger
NAME READY REASON BROKER SUBSCRIBER_URI
trigger-chart-creator True default http://chart-creator.default.svc.cluster.local
trigger-notifier True default http://notifier.default.svc.cluster.local
trigger-query-runner True default http://query-runner.default.svc.cluster.localYou can wait for Cloud Scheduler to trigger the services or you can manually trigger the jobs.
Find the jobs IDs:
gcloud scheduler jobs list
ID LOCATION SCHEDULE (TZ) TARGET_TYPE STATE
cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2 europe-west1 0 16 * * * (UTC) Pub/Sub ENABLED
cre-scheduler-714c0b82-c441-42f4-8f99-0e2eac9a5869 europe-west1 0 17 * * * (UTC) Pub/Sub ENABLEDTrigger the jobs manually:
gcloud scheduler jobs run cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2
gcloud scheduler jobs run cre-scheduler-714c0b82-c441-42f4-8f99-0e2eac9a5869After a minute or so, you should see 2 charts in the bucket:
gsutil ls gs://$BUCKET
gs://knative-atamel-charts/chart-cyprus.png
gs://knative-atamel-charts/chart-unitedkingdom.pngYou should also get 2 emails with links to the charts!
