Airflow – Sub-DAGs

Workflows can quickly become quite complicated which makes them difficult to understand. To reduce complexity tasks can be grouped together into smaller DAGs and then called. This makes DAGs easier to manage and maintain.

Creating a sub dag consists of

  1. Creating the actual Sub DAG and testing it.
  2. Create a DAG which actually calls the Sub-DAG created in Step1.

For those of you wondering how to call a sub-dag. It is easy using the SubDagOperator. In the next example, we would re-use one of our earlier examples as a Sub-DAG and call it from another DAG.

Let’s first see the code of the Sub-DAG. You will observe that it is very similar to a DAG from the previous entries except that it is called inside a function. See Below

# Filename: hello_subdag.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import datetime, timedelta
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule

def my_sub_dag(parent_dag_id):
    # Step 1 - define the default parameters for the DAG
    default_args = {
      'owner': 'airflow',
      'depends_on_past': False,
      'start_date': datetime(2019, 7, 28),
      'email': ['vipin.chadha@gmail.com'],
      'email_on_failure': False,
      'email_on_retry': False,
      'retries': 1,
      'retry_delay': timedelta(minutes=5),

    }
   
    p_val=Variable.get('v_val') #Variable passed to registered method


    # Step 2 - Create a DAG object
    my_sub_dag = DAG(dag_id = parent_dag_id+ '.' + 'my_sub_dag',
            schedule_interval='0 0 * * *' ,
            default_args=default_args
        )

    # Step 3 - Define the method to check the condition for branching
    def my_check_condition(**kwargs):
      if int(p_val)>=15 :
        return 'greater_Than_equal_to_15'
      else:
        return 'less_Than_15'

    # Step 4 - Create a Branching task to Register the method in step 3 to the branching API
    checkTask = BranchPythonOperator(
      task_id='check_task',
      python_callable=my_check_condition, #Registered method
      provide_context=True,
      dag=my_sub_dag
    )

    # Step 5 - Create tasks
    greaterThan15 = BashOperator(
      task_id= 'greater_Than_equal_to_15',
      bash_command="echo value is greater than or equal to 15",
      dag=my_sub_dag
    )

    lessThan15 = BashOperator(
      task_id= 'less_Than_15',
      bash_command="echo value is less than 15",
      dag=my_sub_dag
    )

    finalTask = BashOperator(
      task_id= 'join_task',
      bash_command="echo This is a join",
      trigger_rule=TriggerRule.ONE_SUCCESS,
      dag=my_sub_dag
    )


    # Step 6 - Define the sequence of tasks.
    lessThan15.set_upstream(checkTask)
    greaterThan15.set_upstream(checkTask)
    finalTask.set_upstream([lessThan15, greaterThan15])

    # Step 7 - Return the DAG
    return my_sub_dag

Before moving forward a few important points

  • Airflow Sub DAG is in a separate file in the same directory.
  • Airflow Sub DAG has been implemented as a function.
  • Airflow Sub DAG id needs to be in the following format parent_dag_id.child_dag_id. If not you will get errors. Check out the dag_id in step 2

Next, let’s create a DAG which will call our sub dag. See Below

# Filename: hello_main_dag.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator, SubDagOperator
from datetime import datetime, timedelta
from hello_sub_dag import my_sub_dag

# Step 1 - define the default parameters for the DAG
default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime(2019, 7, 28),
  'email': ['vipin.chadha@gmail.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),

}

# Step 2 - Create a DAG object
dag = DAG(  'hello_main_dag',
        schedule_interval='0 0 * * *' ,
        default_args=default_args
    )

# Step 3 - Create tasks
some_task = BashOperator(
  task_id= 'someTask',
  bash_command="echo I am some task",
  dag=dag
)

sub_dag = SubDagOperator(
    subdag=my_sub_dag('hello_main_dag'),
    task_id='my_sub_dag',
    dag=dag
)

final_task = BashOperator(
  task_id= 'finalTask',
  bash_command="echo I am the final task",
  dag=dag
)

# Step 4 - Define the sequence of tasks.
sub_dag.set_upstream(some_task)
final_task.set_upstream(sub_dag)

If you go thru the code you will find the following

  • Sub DAG python file has been imported as any other file/package.
  • Use of SubDagOperator in step 3 which calls the function in sub dag python file.

The above code results in the following DAG

You can click on the sub-dag which is the box filled with gray color and white text.

In the above pic if you click on Zoom into Sub-DAG it will open the sub dag for us. See Below

When executed the main DAG executes the sub dag, which then launches the tasks inside the sub dag and the control is passed back to the main dag once the sub-dag finishes.

This brings us to the end of this blog entry. Hope you find this blog entry useful.

Leave a Reply

Your email address will not be published. Required fields are marked *