Airflow – Dynamic DAGs

Introduction

Over the last couple of months got lots of queries around dynamic dags. So here is a post on dynamic dags in an airflow. Dynamic tasks is probably one of the best features of airflow. It allows you to launch airflow tasks dynamically inside an airflow DAG. A simple use case can be if you want to launch a shell script with different parameters in a list all at the same time. To make things more fun is that the list size changes all the time. This is where airflow really shines. Let’s see how

If you are new to airflow – before you jump into reading this post read these posts for a getting some background

Airflow DAG – Dynamic Tasks – Example-1

Creating dynamic tasks in a DAG is pretty simple. For the purpose of this section, we will create dynamic tasks based on a range of numbers. Just to illustrate our point. Let’s take look at the DAG below

Airflow DAG

# Filename: hello_dynamic_tasks_eg1.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

# Step 1 - Define the default arguments for DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 12, 18),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Step 2 - Declare a DAG with default arguments
dag = DAG('hello_dynamic_tasks_eg1',
          schedule_interval='0 8 * * *',
          default_args=default_args,
          catchup=False
          )

# Step 3 - Declare dummy start and stop tasks
start_task = DummyOperator(task_id='start', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)

# Steo 4 - Create dynamic tasks for a range of numbers
for i in range(1, 4):
    # Step 4a - Declare the task
    t1 = BashOperator(
        task_id='task_t' + str(i),
        bash_command='echo hello from task:'+str(i),
        dag=dag
    )
    # Step 4b - Define the sequence of execution of tasks
    start_task >> t1  >> end_task

Analyse Airflow DAG

Let’s analyse the code quickly

  • Step 1 – Declare a dictionary of default arguments for DAG
  • Step 2 – Create a DAG with the default arguments
  • Step 3 – Create two dummy tasks
    • A dummy task for beginning the DAG
    • A dummy task for the end of DAG
  • Step 4 – Creates dynamic tasks
    • Step 4a – Create the task using BashOperator
    • Step 4b – Define which is the preceding task and which is the next task
      • The logic defined here can be changed as per your requirements.

DAG execution

This results in a DAG which looks like this

Airflow DAG – Dynamic Tasks – Example-2

Let’s take a slightly more complicated example. Assume that there is an airflow variable which stores a list of elements. The airflow DAG will create a task for every element of the list. If you are unfamiliar with how to create airflow variables please refer to this blog entry.

Airflow Variable

Below is the airflow variable created

Airflow DAG

Below is the airflow DAG which utilizes the above variable and creates dynamic tasks for all the elements in the list.

# Filename: hello_dynamic_tasks_eg1.py
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

# Step 1 - Define the default arguments for DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 12, 18),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Step 2 - Declare a DAG with default arguments
dag = DAG('hello_dynamic_tasks_eg2',
          schedule_interval='0 8 * * *',
          default_args=default_args,
          catchup=False
          )

# Step 3 - Declare dummy start and stop tasks
start_task = DummyOperator(task_id='start', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)

# Step 4 - Read the list of elements from the airflow variable
v_list = Variable.get("v_list", deserialize_json=True)

# Step 5 - Create dynamic tasks for each element of the list
for element in v_list:
    # Step 5a - Declare the task
    t1 = BashOperator(
        task_id='task_t' + str(element),
        bash_command='echo hello from task:' + str(element),
        dag=dag
    )
    # Step 5b - Define the sequence of execution of task
    start_task >> t1 >> end_task

Analyse Airflow DAG

Let’s analyse the code quickly

  • Step 1 – Declare a dictionary of default arguments for DAG
  • Step 2 – Create a DAG with the default arguments
  • Step 3 – Create two dummy tasks
    • A dummy task for beginning the DAG
    • A dummy task for the end of DAG
  • Step 4 – Read the airflow variable which stores the list
  • Step 5 – Creates dynamic tasks
    • Step 4a – Create the task using BashOperator
    • Step 4b – Define which is the preceding task and which is the next task
      • The logic defined here can be changed as per your requirements.

DAG Execution

This DAG is executed twice to show the dynamic nature of the DAG. Each time with a different number of list elements by modifying the airflow variable.

  1. Airflow variable consists of a list of 6 elements as shown above.
  2. List of 4 elements

DAG Execution – 1

When the DAG is manually triggered. It creates the following graph

Six elements in the airflow variable list

DAG Execution – 2

When the list is changed to 4 elements

The following DAG is generated

Four elements in the airflow variable list

As you can see in the screenshot only 4 tasks are generated and executed instead of 6.

As you can see you can apply programmatic logic to dynamically create tasks and DAG logic. DAG logic is independent of business/functional logic, which allows for the various tasks to be moved around with substantial ease reducing turnaround time. This brings us to the end of this post on dynamic tasks. I hope you find this post useful. If you like it please do share it!

2 thoughts on “Airflow – Dynamic DAGs”

  1. how would you do if you had n files in a filesystem path and you want to trigger a task for any of those files in that path.
    Besides the path is based on the execution_date: /tmp/2021-06-24/{file1.log,file2.log} which is not available from the main dag context.

    Reply

Leave a Comment