Introduction
Over the last couple of months got lots of queries around dynamic dags. So here is a post on dynamic dags in an airflow. Dynamic tasks is probably one of the best features of airflow. It allows you to launch airflow tasks dynamically inside an airflow DAG. A simple use case can be if you want to launch a shell script with different parameters in a list all at the same time. To make things more fun is that the list size changes all the time. This is where airflow really shines. Let’s see how
If you are new to airflow – before you jump into reading this post read these posts for a getting some background
Table of Contents
Airflow DAG – Dynamic Tasks – Example-1
Creating dynamic tasks in a DAG is pretty simple. For the purpose of this section, we will create dynamic tasks based on a range of numbers. Just to illustrate our point. Let’s take look at the DAG below
Airflow DAG
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
# Step 1 - Define the default arguments for DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 12, 18),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# Step 2 - Declare a DAG with default arguments
dag = DAG('hello_dynamic_tasks_eg1',
schedule_interval='0 8 * * *',
default_args=default_args,
catchup=False
)
# Step 3 - Declare dummy start and stop tasks
start_task = DummyOperator(task_id='start', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)
# Steo 4 - Create dynamic tasks for a range of numbers
for i in range(1, 4):
# Step 4a - Declare the task
t1 = BashOperator(
task_id='task_t' + str(i),
bash_command='echo hello from task:'+str(i),
dag=dag
)
# Step 4b - Define the sequence of execution of tasks
start_task >> t1 >> end_task
Analyse Airflow DAG
Let’s analyse the code quickly
- Step 1 – Declare a dictionary of default arguments for DAG
- Step 2 – Create a DAG with the default arguments
- Step 3 – Create two dummy tasks
- A dummy task for beginning the DAG
- A dummy task for the end of DAG
- Step 4 – Creates dynamic tasks
- Step 4a – Create the task using BashOperator
- Step 4b – Define which is the preceding task and which is the next task
- The logic defined here can be changed as per your requirements.
DAG execution
This results in a DAG which looks like this
Airflow DAG – Dynamic Tasks – Example-2
Let’s take a slightly more complicated example. Assume that there is an airflow variable which stores a list of elements. The airflow DAG will create a task for every element of the list. If you are unfamiliar with how to create airflow variables please refer to this blog entry.
Airflow Variable
Below is the airflow variable created
Airflow DAG
Below is the airflow DAG which utilizes the above variable and creates dynamic tasks for all the elements in the list.
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
# Step 1 - Define the default arguments for DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 12, 18),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# Step 2 - Declare a DAG with default arguments
dag = DAG('hello_dynamic_tasks_eg2',
schedule_interval='0 8 * * *',
default_args=default_args,
catchup=False
)
# Step 3 - Declare dummy start and stop tasks
start_task = DummyOperator(task_id='start', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)
# Step 4 - Read the list of elements from the airflow variable
v_list = Variable.get("v_list", deserialize_json=True)
# Step 5 - Create dynamic tasks for each element of the list
for element in v_list:
# Step 5a - Declare the task
t1 = BashOperator(
task_id='task_t' + str(element),
bash_command='echo hello from task:' + str(element),
dag=dag
)
# Step 5b - Define the sequence of execution of task
start_task >> t1 >> end_task
Analyse Airflow DAG
Let’s analyse the code quickly
- Step 1 – Declare a dictionary of default arguments for DAG
- Step 2 – Create a DAG with the default arguments
- Step 3 – Create two dummy tasks
- A dummy task for beginning the DAG
- A dummy task for the end of DAG
- Step 4 – Read the airflow variable which stores the list
- Step 5 – Creates dynamic tasks
- Step 4a – Create the task using BashOperator
- Step 4b – Define which is the preceding task and which is the next task
- The logic defined here can be changed as per your requirements.
DAG Execution
This DAG is executed twice to show the dynamic nature of the DAG. Each time with a different number of list elements by modifying the airflow variable.
- Airflow variable consists of a list of 6 elements as shown above.
- List of 4 elements
DAG Execution – 1
When the DAG is manually triggered. It creates the following graph

DAG Execution – 2
When the list is changed to 4 elements

The following DAG is generated
As you can see in the screenshot only 4 tasks are generated and executed instead of 6.
As you can see you can apply programmatic logic to dynamically create tasks and DAG logic. DAG logic is independent of business/functional logic, which allows for the various tasks to be moved around with substantial ease reducing turnaround time. This brings us to the end of this post on dynamic tasks. I hope you find this post useful. If you like it please do share it!
how would you do if you had n files in a filesystem path and you want to trigger a task for any of those files in that path.
Besides the path is based on the execution_date: /tmp/2021-06-24/{file1.log,file2.log} which is not available from the main dag context.
Hi Can we Create a dag using database where the values are store in db