Building CI/CD with Airflow, GitLab and Terraform in GCP

The Ripple Data Engineering team is expanding, which means higher frequency changes to our data pipeline source code. This means we need to build better, more configurable and more collaborative tooling that prevents code collisions and enforces software engineering best practices. To ensure the quality of incoming features, the team sought to create a pipeline that automatically validated those features, build them to verify their interoperability with existing features and GitLab,  and alert the respective owners of any failures in the pipeline. These are pretty standard DevOps requirements, and to achieve them, our team implemented a Continuous Integration Continuous Deployment (CI/CD) approach for our data applications in Google Cloud Platform (GCP).

For our data applications, we use:

  • Airflow - to manage data services through GCP. Airflow embodies the concept of Directed Acyclic Graphs (DAGs), which are written in Python, to declare sequential task configurations that carry out our workflow.
  • Cloud Composer - for the orchestration of our data pipelines ETL jobs and scheduled tasks
  • GitLab - for source code management and to target multiple data environments for testing and quality assurance

This post breaks down our initial, modest solution for building a CI/CD deployment pipeline that leverages Cloud Composer Airflow and GitLab.

Topics:

Initializing GitLab Pipelines

Pipelines are a structured topographical way to configure continuous integration, delivery, and deployment in GitLab. You use a .gitlab-ci.yml file in a repo to define the CI/CD settings to invoke based on the triggers you define. For example, you could set up your repo to be triggered by incoming changes to your branch source code, which is what we wanted to do for our code pipeline.


When setting up CI/CD and Cloud Composer, many teams have leveraged Google Cloud Build (Option 1), Google’s native CI/CD platform that can authenticate and target cloud resources while performing deep security scans and package source artifacts. But, at the time of writing, GCP Build does not support GitLab or generic git repositories and solely integrates with Cloud Service Repositories and GitHub. We use GitLab, so we had to create a GitLab Python Runner (Option 2).

Define the Build Stage

To set up a GitLab Python Runner as we did in Option 2, add a simple .gilab-ci.yml to your repo similar to the following example.

stages:
  - Build

# Build Repo
Build:
  stage: Build
  image:
    name: apache/airflow:1.10.11-python3.8
    entrypoint: [""]
  rules:
    - if: $CI_MERGE_REQUEST_ID
    - changes:
        - dags/*
        - slack/*
        - "**/*.{py}"
        - .gitlab-ci.yml
  script:
    - #ADD YOUR REPO VALIDATION CODE HERE
    - echo "DAG REPO BUILD SUCCESSFUL"

Here's an explanation of the keys and values used in the example:

  • image -
    • name - The base docker image you want to use that contains all the necessary dependencies. To use a managed image, add the image name for the name field. We use the productionized Airflow docker image with the Python version aligned with our current code base.
  • rules - This section defines what triggers the pipeline, such as commits to your source code. In this example, we run our pipeline when:
    • a merge request is opened or receives a new commit (if: $CI_MERGE_REQUEST_ID)
    • a new commit has been pushed to any branch in the repo that has modified any of the targeted folders specified in the changes: fields
  • script - This is where you place that shell script that will compile incoming source code. We’ll come back to this in the Build Validation and Code Compile section.

After you add the .gitlab-ci.yml file and push it to your repo, you will see the pipeline icon initiate with the contents of your config YAML file.

Styling, Linting, and Standardization

To enforce best practices and check for any idiosyncratic syntax errors, we use styling and linting jobs in our pipeline. Our team configured a pre-commit hook framework to automatically identify syntax and style issues in our code on every commit, such as missing semicolons, trailing whitespace, and improperly named variables on every commit. By catching these issues out before code review, peers can focus on the workflow logic and architecture being changed within the DAG rather than waste time with trivial style nitpicks.

To configure a git-hook in a hooks directory to trigger the pre-commit actions that will be invoked upon a git commit:

  1. Install pre-commit and attach the commit hook pipeline in your local repository:
    cd ~/path/to/project/root
    pip install pre-commit
    pre-commit install
  2. (Optional) If you need to run pre-commit without submitting a git commit, run:
    pre-commit run --all-files
  3. (Optional) If you need to disable pre-commit, run:
    export PRE_COMMIT_ALLOW_NO_CONFIG=1
  4. Add the .pre-commit-config.yaml to your repo.
repos:
  - repo: https://github.com/prettier/prettier
    rev: "2.0.5"
    hooks:
      - id: prettier
  - repo: https://github.com/pre-commit/mirrors-autopep8
    rev: v1.5.3
    hooks:
      - id: autopep8
  - repo: https://github.com/asottile/seed-isort-config
    rev: v2.2.0
    hooks:
      - id: seed-isort-config
  - repo: https://github.com/pre-commit/mirrors-isort
    rev: v4.3.21
    hooks:
      - id: isort
  - repo: https://github.com/psf/black
    rev: 19.10b0
    hooks:
      - id: black
        language_version: python3
  - repo: https://github.com/PyCQA/flake8
    rev: 3.8.3
    hooks:
      - id: flake8


In this  example YAML file, all local git commits are verified through a pipeline of scripts that format and test for linting. The linting and styling checks that this file defines include:

  • prettier - Formats markdown files to conform the Commonmark specification of Markdown.
  • autopep8 - Automatically formats Python code to conform to the PEP 8 style guide.
  • seed-isort-config - Statically populates the known_third_party isort setting in Python.
  • isort - Sorts Python imports alphabetically and automatically separates into sections and by type.
  • black - Enforces formatting that conforms to PEP 8.
  • flake8 - Enforces a pycodestyle.

Add .flake8 and the .isort.cfg files to your repo so that you can adjust those linters to meet your needs. For more information about how to adjust the settings for the linters, see flake8-docs and isort-docs.

Build Validation and Code Compile

An Airflow DAG is structural task code but that doesn't mean it's any different than other Python scripts. This means we can check if the script is compilable, verify targeted dependencies are installed, and ensure variables are correctly declared. We want to “fail fast” to minimize the  duration of a commit job from a feature branch. This build validation and DAG cross-check can be separated into three primary task servicabilities:

Python Build Validation

Let's revisit our Build stage.

# Build Repo
Build:
  stage: Build
  image:
    name: apache/airflow:1.10.11-python3.8
    entrypoint: [""]
  rules:
    - if: $CI_MERGE_REQUEST_ID
    - changes:
        - dags/*
        - slack/*
        - "**/*.{py}"
        - .gitlab-ci.yml
  script:
    - pip install -r requirements.txt --user
    - export PYTHONPATH="${PYTHONPATH}:/tmp/slack"
    - export $(cat .env/.devenv | xargs)
    - |
      for dag in $(find . -name '*.py');do
        if [[ "${dag}" =~ "dags/" ]]; then 
          echo "VALIDATING: ${dag}"
          python ${dag} 
        fi
      done
      echo "DAG REPO BUILD SUCCESSFUL"

Here, we updated the script so that the image we use installs all the correct python dependencies listed in our requirements.txt file. For example, our pipelines depend on  apache-airflow[gcp], PyYAML, and zipp. Then we simply iterate through our repo and run a Python {DAG_NAME}.py on our dags/ directory to ensure the DAG compiles with the installed Python interpreter version. If all dependencies are installed and the DAG is correctly configured, we should see no failures.

Airflow Quality Assurance

Next, we need to know if the Airflow engine can parse our DAGs and find the task objects. To verify that our DAGs are successfully added to the Airflow platform DagBag and enforce that the expected DAG configurations are set, we can create a dag_QA.py file. We can use that file to directly call the Airflow engine DagBag to confirm that our DAGs are successfully added. We can also use that file to invoke smoke tests that call the Airflow platform programmatically and confirm each DAG is behaving the way we expect it to.

Our smoke tests verify that:

  • There are no syntax issues or environment compatibility issues.
  • All DAGs can be parsed under 2 seconds.
  • Each DAG filename matches the dag_id.
  • Owners are set for all DAGs.

# Validate quality assurance for all DAGs
QA Validation:
  stage: QA Validation
  rules:
    - if: $CI_MERGE_REQUEST_ID
    - if: $CI_COMMIT_BRANCH == "master"
  image:
    name: apache/airflow:1.10.11-python3.8
    entrypoint: [""]
  script:
    - pip install -r requirements.txt --user
    - export PYTHONPATH="${PYTHONPATH}:/tmp/slack"
    - export AIRFLOW_HOME="$(pwd)"
    - export $(cat .env/.devenv | xargs)
    - airflow initdb
    - airflow list_dags
    - python tests/dag_qa_validation.py

In this example, we added a QA Validation stage to our GitLab pipeline runner. Here's what that the runner does:

  • Install all the dependencies for our DAGs listed in the requirements.txt file on the runner agent that was provisioned using the Airflow image:

    pip install -r requirements.txt --user

  • We export environment variables to keep the source code DRY (Don't Repeat Yourself) and to handle our staging environments. Each DAG calls the imported environment variable that's populated with the cloud resource being referenced based on the target stage we’re in.

    export PYTHONPATH="${PYTHONPATH}:/tmp/slack"
    export AIRFLOW_HOME="$(pwd)"

  • Since we want to run the real Airflow engine, we set up a database backend:
    airflow initdb

  • As Airflow was built to interact with its metadata:

    airflow list_dags

  • Finally, we invoke our QA tests:

    python tests/dag_qa_validation.py

DAG Unit Testing

Testing is underdeveloped in the data analytics world. A lot of big data warehousing projects have virtually no testing. Although SQL is a declarative language and is thus less error-prone, you can still make mistakes. The lack of automated testing is a big pain and continues to be explored.

In our project, we use unit tests—individual checks for each respective DAG in your source repository that tests task logic, functionality, and asserts expected task values. This is a very well-documented area for Python but a newly explored space for Airflow. How do you correctly mock and assert Airflow operator communications and values without invoking the Operator API? This post doesn't cover this problem but here are a few great resources of other developers beginning to spearhead this effort in Airflow:

Authentication and Access

Terraform

Now that we have a build and code validation stages, we can deploy our code to staging environments for additional testing. To do this, we need to store GCP access tokens in GitLab to allow our runner to deploy our DAGs to GCS buckets in targeted environments. Additionally, when we deploy code to Cloud Composer, our runner will need permissions to modify the composer environment and write to the composer bucket. We use Terraform to manage our infrastructure in GCP.

First, we’ll add a service account with the appropriate permissions in the Terraform repo that maintains our Cloud Composer environments.

Here is an example of adding a new service account resource to our dev composer project with the appropriate permissions to modify the composer environment and storage.

# -------- Composer gitlab service account for DEV stage runner --------

module "datateam-dev-composer-gitlab-runner-srv-acct" {
  source = "../../modules/gcp-service-account/"
  project_id = "${google_project.datateam-dev-composer-project.project_id}"
  account_id = "datateam-dev-composer-gitlab-runner1"
  display_name = "Dev Composer GitLab Service Account"
}

We assign the composer.environmentAndStorageObjectAdmin role to the newly created service account for each composer environment.

# Composer gitlab environment and storage admin role required to update composer environment variables
resource "google_project_iam_member" "datateam-dev-composer-gitlab-runner" {
  project = "${google_project.datateam-dev-composer-project.project_id}"
  role = "roles/composer.environmentAndStorageObjectAdmin"
  member = "serviceAccount:${module.datateam-dev-composer-gitlab-runner-srv-acct.service_account_email}"
}

Next, we add an instance of a gitlab_project_variable that outputs the JSON key of the serviceAccount to a GitLab project variable to be used in our pipeline runner.

# Add a Staging SA JSON to the datateam-composer-dags repo project (001)
resource "gitlab_project_variable" "datateam_staging_gitlab_composer_dags_sa_key" {
    project = "001"
    key = "Datateam_STAGING_GCLOUD_PRIVATE_KEY_JSON"
    value = "${base64decode(google_service_account_key.datateam-staging-composer-gitlab-runner-srv-acct-2-key.private_key)}"
    variable_type = "file"
    environment_scope = "All"
}

Here's an explanation of the fields in the code example:

  • project - Your gitlab project_id
  • key - An arbitrary name for your key
  • value - The output of your service account key decoded in base64variable_type - Required for JSON.

    You can also configure team-level variables for the Service Account key to use across multiple GitLab projects. Check out the gitlab_group_variable resource for more details.
  • environment_scope - Allows you to scope the accessibility of the variable to certain branches

GitLab Pipeline

Once the new changes to terraform infrastructure are  defined, add your variable as a reference in your pipeline YAML file to be used by gsutil in your script.

.gcloud_stage_init: &gcloud_stage_init
  - if [ -f "$DATATEAM_STAGING_GCLOUD_PRIVATE_KEY_JSON" ]; then gcloud auth activate-service-account --key-file $DATATEAM_STAGING_GCLOUD_PRIVATE_KEY_JSON; fi

Deploy Staging DAG:
  <<: *gsutil_image
  stage: DAG Deployment
  environment:
    name: staging
    url: <your Airflow UI URL>
  rules:
    - if: $CI_MERGE_REQUEST_ID
      when: manual
  before_script:
    - *gcloud_stage_init
  script:
    - #To be added

Here we add  gcloud_stage_init which configures the GCP CLI to run our gsutil and gcloud composer environments commands to update cloud composer and write our DAGs to the composer bucket.

Staging and Cloud Deployment

You will need to repeat the steps mentioned in the Authentication and Access section and create access keys for each environment in your data project.


In the sample script above, we created a stage called DAG Deployment, which runs only when a commit is pushed to a merge request and is manually invoked by a user.

  rules:
    - if: $CI_MERGE_REQUEST_ID
      when: manual

The manual delivery setting is a deployment measure to ensure there is a human-in-loop prior to deploying to a targeted stage to assure quality. (If you want to to automatically invoke this step in your pipeline, you can omit when: manual in the code.)

To synchronize contents of our GitLab repo to the buckets/directories, we run gsutil -m rsync.

To update the environment variables that the DAGs use in Airflow, we run gcloud composer environments update:

script:
    - export $(cat .env/.stageenv | xargs)
    - readarray -t env_arr <.env/.stageenv
    - gsutil -m rsync -d -r . $AIRFLOW_VAR_COMPOSER_BUCKET/dags/
    - gcloud composer environments update $AIRFLOW_VAR_COMPOSER_ENVIRONMENT --project $AIRFLOW_VAR_COMPOSER_PROJECT --location $AIRFLOW_VAR_COMPOSER_LOCATION --update-env-variables=$env --clear-env-variables

Alerting and Slack Notifications

Our team wanted to receive notifications to alert us when CI/CD pipeline failures occurred. The mechanism that allows us to do this is alerting. The pipeline can send notifications to the team via their preferred communication channel, which, in our case, was Slack.

In our environment, alerting is configured at 3 levels:

  • GCP - Send alerts based on the health of your infrastructure and cloud resource
  • GitLab - Send alerts based on the events of the repo CI/CD pipeline
  • Application / Airflow - Application alerts that send the logs of failure. In this case, the application is a DAG inAirflow

Before configuring any of these alerting settings, you may need to get a web hook to authenticate against your organization's Slack to write messages to a targeted channel. Contact your IT administrator for more information.

Configuring GCP Alerting

GCP Alerting for resources are provided through Alerting Policies and can be configured in the Monitoring console. In this example, we set an alerting policy on our production environment's Cloud Composer health status. The condition we use in this policy monitors an Airflow webserver metric whose value is 0 when the composer health has failed.

Configuring GitLab Alerting

To configure alerting in GitLab:

  1. Navigate to the GitLab repo.
  2. Confirm that you have maintainer permissions for the project to configure settings.
  3. In Settings, select Webhooks and choose which event triggers you want to invoke alerts that will be shared through your webhook.
  4. Add the webhook to the slack channel you want to target for GitLab Runner failures.

Configuring Airflow Alerting

When a Composer job fails, the failure invokes a notification in Slack. Our repo contains a slack module that is deployed to the DAGs bucket of the composer environment.

To set up this alerting:

  1. In the Airflow UI, under Admin > Variables create a variable:
  • key: ENVIRONMENT_NAME
  • value: The appropriate environment name (e.g., Production or Staging) as the value. The Slack notification uses this variable to differentiate the environment in which the failure occurred.
  1. Create a Slack webhook and connection by configuring the proper credentials in Airflow UI under Admin > Connections.

For more information on configuring Airflow Slack Alerts, check out this useful article.

Next Steps

As mentioned prior, this is a modest solution at building a CI/CD process that works for our current implementation of data pipelines. What is beneficial to this approach is the flexibility to easily reconfigure components to this process to support our other applications in NodeJS, Java and Golang.  

In the future, we want to continue improving this pipeline. Here are a few things that we may tweak:

  • Leveraging Airflow 2.0 that supports DAG versioning
  • Replacing our rsync for dag deployment with something more robust as our DAG repo expands
  • Adding concrete examples of Unit Tests that mock and assert our task logic

If you’re interested in working on this type of project or other emerging blockchain and digital asset technologies, we’re hiring!