How to schedule and run BigQuery using Cloud Composer?

Cloud Composer

Cloud Composer is a fully managed orchestration tool in GCP. It is used to author, schedule and monitor the workflows. It is built on the popular and ever-growing open source Apache Airflow project.

Apache Airflow

Apache Airflow is an open source software that allows developers to build data pipelines by writing Python scripts. These scripts, called directed acyclic graphs or DAGs, tell the Airflow engine a list of tasks to execute,
the order in which to execute the tasks, and a schedule of how often each should run.

Steps to schedule and run BigQuery using Cloud Composer

Consider that we have few queries that needs to run daily in BigQuery. Also we want to define the order of sql execution.

For that, we can create the Airflow DAG and deploy into the Cloud Composer environment. Let’s see the steps to create the workflow and execute these tasks.

Task 1

Task 2

Step 1 : Create Cloud Composer environment in GCP

We are going to use the Cloud console to create the Cloud Composer environment. As shown below, we need to select the Composer to land the create environment page.

Cloud Composer in Google Cloud Platform
Cloud Composer in Google Cloud Platform

In this window, we need to select create environment option. In the drop down, we need to select the specific Airflow version and auto scaling option. For this example, we are selecting Composer 2 option which creates Airflow 2 with autoscaling.

Create environment in Cloud Composer
Create environment in Cloud Composer

Next we need to give the name for the environment , select the service account and environment resources based on our requirement.

Environment creation in Cloud Composer
Environment creation in Cloud Composer

Finally click the create button to start creating the environment. The approximate time to create an environment is 25 minutes.

Configuration in Cloud Composer
Configuration in Cloud Composer

Once the Cloud Composer environment is created, we can launch the Airflow web ui by selecting the Airflow web server option as shown below

Launch Airflow in Cloud Composer
Launch Airflow in Cloud Composer

The below screen is the Airflow home page which is created in Cloud composer environment.

Airflow Home Page in Cloud Composer
Airflow Home Page in Cloud Composer

Step.2 : Access GCS bucket to add or update the DAG

Cloud composer use Cloud storage bucket to store the DAG of our cloud composer environment. Let’s check the GCS bucket.

As shown below, GCS bucket is created with our location and environment name us-central1-rc-test-workflo-d87a7a6a-bucket.

Also we can see the Airflow configuration files in this bucket. The DAG details are present in the dags folder.

GCS bucket creation for Cloud Composer environment
GCS bucket creation for Cloud Composer environment

Step 3 : Create DAG in Airflow

As mentioned earlier, we are going to create two tasks in the workflow. Task1 will insert the customer onboard status into compliance base table. Task 2 will insert the compliance base records into compliance analytics table.

  • task 1 : customer_onboard_status -> compliance_base
  • task 2 : compliance_base -> compliance_analytics

Let’s write the DAG in Python.

Import the required libraries

Set the default arguments

initiate the DAG variable for all the task operators

run sql from template_searchpath

set dependency for the task

Let’s combine all the steps and put it in the file with the extension .py. We need to place this file in dags folder under the GCS bucket.

As shown below, we have added our DAG file daily_refresh_rc_bigquery.py in the dags folder

Add DAG in GCS bucket
Add DAG in GCS bucket

The Airflow web UI takes some 60 or 90 seconds to reflect the DAG changes. Let’s verify our DAG in UI.

DAG changes in Airflow web UI
DAG changes in Airflow web UI

Step 4 : Place the sql files in template path

In our DAG file, we have specified the template path as /home/airflow/gcs/dags/scripts. This path is equals to gs://bucket-name/dags/scripts.

In this path, we need to place our SQL files. So that Airflow task can fetch the sql from that path while executing it.

Sql files placement in GCS bucket
Sql files placement in GCS bucket

Step 5: Verify the task in Airflow DAG

Now we can open our DAG daily_refresh_rc_bigquery in Airflow and verify the tasks. If we click our DAG name in Airflow web UI, it will take us to the task details as below.

Airflow DAG in Cloud composer
Airflow DAG in Cloud composer

As per our Python DAG file, the tasks compliance_base and compliance_analytics are created under Airflow DAG daily_refresh_rc_bigquery. Also the workflow is scheduled to run daily.

If we want to check our DAG code, we can click the Code option. Airflow has set of colour notation for the tasks which are mentioned in this page.

Currently the workflow is displayed in the tree view. We can change this to graph view by clicking the Graph option. Let’s do that.

Graph view in Apache Airflow
Graph view in Apache Airflow

The graph view shows the task details in more clear format. It is a Directed Acyclic Graph (DAG). The tasks will run one after another.

First compliance_base will be executed. Once it is completed successfully, the next task compliance_analytics will be executed.

Step 6: Run Airflow DAG to execute BigQuery

Now we can click the RUN button in Airflow to execute the DAG. As shown below, we just clicked the Trigger DAG option under run button

Run Airflow DAG in UI
Run Airflow DAG in UI

The tasks are executed successfully which means it ran the queries in BigQuery.

BigQuery execution in Airflow
BigQuery execution in Airflow

If we want to check the task/job log, we can just click the task. It will open the pop up where we can select the log.

Log file in Apache Airflow
Log file in Apache Airflow

The records are inserted successfully in BigQuery tables compliance_base & compliance_analytics.

Task 1 :

BigQuery table : compliance_base
BigQuery table : compliance_base

Task 2 :

BigQuery table : compliance_analytics
BigQuery table : compliance_analytics

Finally we learnt to schedule and run the BigQuery using Cloud Composer in GCP.

Complete DAG code for your reference

Recommended Articles

References from GCP official documentation