Build Data Pipelines with Apache Airflow

From: https://towardsdatascience.com/https-medium-com-xinran-waibel-build-data-pipelines-with-apache-airflow-808a4de79047

Originally created at Airbnb in 2014, Airflow is an open-source data orchestration framework that allows developers to programmatically author, schedule, and monitor data pipelines. Airflow experience is one of the most in-demand technical skills for Data Engineering (another one is Oozie) as it is listed as a skill requirement in many Data Engineer job postings.

In this blog post, I will explain core concepts and workflow creation in Airflow, with source code examples to help you create your first data pipeline using Airflow.


Concepts

Here are the basic concepts and terms frequently used in Airflow:

DAG: In Airflow, a DAG (Directed Acyclic Graph) is a group of tasks that have some dependencies on each other and run on a schedule. Each DAG is equivalent to a logical workflow. A DAG Run is a specific run of the DAG.

Operator: An operator is a Python class that acts as a template for a certain type of job, for example:

Task: Once an operator is instantiated with specific arguments, it becomes a task.

Task Instance: A task instance represents a specific run of a task and it has a state, for example: “running”, “success”, “failed”, “skipped”, “up for retry”, etc.

Workflow Creation

A DAG (aka a workflow) is defined in a Python file stored in Airflow’s DAG_FOLDER and contains 3 main components: the DAG definition, tasks, and task dependencies.

Default Arguments

When the default_argsdictionaryis passed to a DAG, it applies to all tasks belonging to the DAG:

default_args = {   'owner': 'xinran.waibel',  'start_date': datetime(2019, 12, 1),  'retries': 1,  'on_failure_callback': slack_failure_msg}

Some useful parameters:

  • start_date: The execution_date for the first DAG run.
  • end_date: The date the DAG should stop running (usually none).
  • execution_timeout: The maximum times a task can run.
  • retries: The number of retries that can be performed before the task fails.
  • retry_delay: The delay time between retries.
  • depends_on_past: When it is set to true, a task instance will only run if the previously scheduled task instance succeeds.
  • on_failure_callback: The function to be called when a task instance fails.

DAG Definition

# Create a DAG using context manager (with...as...)
# Benefits of context manager: all tasks within context manager is automatically assigned to the DAG so you don't have to explicitly set dag=dag for all tasks.
with DAG('DAG_NAME', default_args=default_args, schedule_interval='@once') as dag:
  
  # Create SQL task A using BigQueryOperator
  bq_sql_task_a = bigquery_operator.BigQueryOperator(
    task_id='demo_bq_sql_a',
    sql="INSERT INTO TABLE TABLE_B SELECT * FROM TABLE_A",
    use_legacy_sql=False)
  
  # Create SQL task B
  bq_sql_task_b = bigquery_operator.BigQueryOperator(
    task_id='demo_bq_sql_b',
    sql="INSERT INTO TABLE TABLE_C SELECT * FROM TABLE_B",
    use_legacy_sql=False)
  
  # Set dependency between A and B: B depends on A
  bq_sql_task_a >> bq_sql_task_b
  
  # Create Slack notification task using SlackAPIPostOperator
  slack_msg_task = SlackAPIPostOperator(
    task_id='slack_msg',
    channel='data_notifications',
    token=os.environ['SLACK_API_TOKEN'],
    text="""
    :white_check_mark: Workflow Succeeded
    *Dag*: {dag}
    *DAG Run ID*: {dag_run_id}
    """.format(dag=dag.dag_id, dag_run_id='{{ run_id }}'))
  
  # Slack task depends on both A and B
  [bq_sql_task_a, bq_sql_task_b] >> slack_msg_task

Some useful parameters for DAG constructor:

DAG files need to be evaluated quickly (in seconds) since the scheduler will execute them periodically (around every minute) to reflect the workflow changes, if any. Thus, don’t perform actual data processing in DAG files.

Task Dependency

Currently, there are two main ways to set dependencies between tasks:

  • Python’s bitshift operators (>> and <<)
  • set_upstream() and set_downstream() methods
# Task B depends on Task A and Task C depends on Task B
task_a >> task_b >> task_c# Task D depends on Task C
task_c.set_downstream(task_d)

You can also define dependencies among multiple tasks using Python’s list:

# Task C will run after both Task A and B complete
[task_a, task_b] >> task_c

Use chain() function to define a sequential dependency:

from airflow.utils.helpers import chain# Both Task B and C depend on Task A
# Task D depends on both Task B and C
chain(task_a, [task_b, task_c], task_d)# The statement above is equivalent to:
task_a >> [task_b, task_c] >> task_d

Use cross_downstream() to set dependencies between two groups of tasks:

from airflow.utils.helpers import cross_downstream# Task C and D will run after both Task A and B complete
cross_downstream([task_a, task_b], [task_c, task_d])# The statement above is equivalent to:
[task_a, task_b] >> task_c
[task_a, task_b] >> task_d

Congrats! You just learned how to create a data workflow using Airflow.

You can simply deploy your workflow by adding the DAG file and any dependency files to Airflow’s DAG_FOLDER and Airflow will automatically pick it up. Then, you can use Airflow’s built-in web UI to monitor and debug your workflow at any time (which is very straightforward so I won’t cover details here).

Ready to learn more about Airflow? Check out Airflow Tips and Best Practices!

Leave a comment