
Originally created at Airbnb in 2014, Airflow is an open-source data orchestration framework that allows developers to programmatically author, schedule, and monitor data pipelines. Airflow experience is one of the most in-demand technical skills for Data Engineering (another one is Oozie) as it is listed as a skill requirement in many Data Engineer job postings.
In this blog post, I will explain core concepts and workflow creation in Airflow, with source code examples to help you create your first data pipeline using Airflow.
Concepts
Here are the basic concepts and terms frequently used in Airflow:
DAG: In Airflow, a DAG (Directed Acyclic Graph) is a group of tasks that have some dependencies on each other and run on a schedule. Each DAG is equivalent to a logical workflow. A DAG Run is a specific run of the DAG.
Operator: An operator is a Python class that acts as a template for a certain type of job, for example:
- BashOperator: execute a bash command
- PythonOperator: run a Python function
- PythonVirtualenvOperator: run a function in a virtual environment that is created and destroyed automatically
- BigQueryOperator: query and process data in BigQuery
- PapermillOperator: execute a Jupyter Notebook
Task: Once an operator is instantiated with specific arguments, it becomes a task.
Task Instance: A task instance represents a specific run of a task and it has a state, for example: “running”, “success”, “failed”, “skipped”, “up for retry”, etc.
Workflow Creation
A DAG (aka a workflow) is defined in a Python file stored in Airflow’s DAG_FOLDER and contains 3 main components: the DAG definition, tasks, and task dependencies.
Default Arguments
When the default_argsdictionaryis passed to a DAG, it applies to all tasks belonging to the DAG:
default_args = { 'owner': 'xinran.waibel', 'start_date': datetime(2019, 12, 1), 'retries': 1, 'on_failure_callback': slack_failure_msg}
Some useful parameters:
- start_date: The execution_date for the first DAG run.
- end_date: The date the DAG should stop running (usually none).
- execution_timeout: The maximum times a task can run.
- retries: The number of retries that can be performed before the task fails.
- retry_delay: The delay time between retries.
- depends_on_past: When it is set to true, a task instance will only run if the previously scheduled task instance succeeds.
- on_failure_callback: The function to be called when a task instance fails.
DAG Definition
# Create a DAG using context manager (with...as...)
# Benefits of context manager: all tasks within context manager is automatically assigned to the DAG so you don't have to explicitly set dag=dag for all tasks.
with DAG('DAG_NAME', default_args=default_args, schedule_interval='@once') as dag:
# Create SQL task A using BigQueryOperator
bq_sql_task_a = bigquery_operator.BigQueryOperator(
task_id='demo_bq_sql_a',
sql="INSERT INTO TABLE TABLE_B SELECT * FROM TABLE_A",
use_legacy_sql=False)
# Create SQL task B
bq_sql_task_b = bigquery_operator.BigQueryOperator(
task_id='demo_bq_sql_b',
sql="INSERT INTO TABLE TABLE_C SELECT * FROM TABLE_B",
use_legacy_sql=False)
# Set dependency between A and B: B depends on A
bq_sql_task_a >> bq_sql_task_b
# Create Slack notification task using SlackAPIPostOperator
slack_msg_task = SlackAPIPostOperator(
task_id='slack_msg',
channel='data_notifications',
token=os.environ['SLACK_API_TOKEN'],
text="""
:white_check_mark: Workflow Succeeded
*Dag*: {dag}
*DAG Run ID*: {dag_run_id}
""".format(dag=dag.dag_id, dag_run_id='{{ run_id }}'))
# Slack task depends on both A and B
[bq_sql_task_a, bq_sql_task_b] >> slack_msg_task
Some useful parameters for DAG constructor:
- schedule_interval: A cron expression to specify how often the DAG should run.
- catchup: Turning catchup off is recommended if your DAG performs backfill internally.
DAG files need to be evaluated quickly (in seconds) since the scheduler will execute them periodically (around every minute) to reflect the workflow changes, if any. Thus, don’t perform actual data processing in DAG files.
Task Dependency
Currently, there are two main ways to set dependencies between tasks:
- Python’s bitshift operators (
>>and<<) set_upstream()andset_downstream()methods
# Task B depends on Task A and Task C depends on Task B
task_a >> task_b >> task_c# Task D depends on Task C
task_c.set_downstream(task_d)
You can also define dependencies among multiple tasks using Python’s list:
# Task C will run after both Task A and B complete
[task_a, task_b] >> task_c
Use chain() function to define a sequential dependency:
from airflow.utils.helpers import chain# Both Task B and C depend on Task A
# Task D depends on both Task B and C
chain(task_a, [task_b, task_c], task_d)# The statement above is equivalent to:
task_a >> [task_b, task_c] >> task_d
Use cross_downstream() to set dependencies between two groups of tasks:
from airflow.utils.helpers import cross_downstream# Task C and D will run after both Task A and B complete
cross_downstream([task_a, task_b], [task_c, task_d])# The statement above is equivalent to:
[task_a, task_b] >> task_c
[task_a, task_b] >> task_d
Congrats! You just learned how to create a data workflow using Airflow.
You can simply deploy your workflow by adding the DAG file and any dependency files to Airflow’s DAG_FOLDER and Airflow will automatically pick it up. Then, you can use Airflow’s built-in web UI to monitor and debug your workflow at any time (which is very straightforward so I won’t cover details here).
Ready to learn more about Airflow? Check out Airflow Tips and Best Practices!