Airflow – Branching

Introduction

Branching is a useful concept when creating workflows. Simply speaking it is a way to implement if-then-else logic in airflow. For example, there may be a requirement to execute a certain task(s) only when a particular condition is met. This is achieved via airflow branching. If the condition is true, certain task(s) are executed and if the condition is false, different task(s) are executed.

Branching is achieved by implementing an Airflow operator called the BranchPythonOperator. To keep it simple – it is essentially, an API which implements a task. This task then calls a simple method written in python – whose only job is to implement an if-then-else logic and return to airflow the name of the next task to execute.

Branching

Let’s take an example. We want to check if the value of a variable is greater than 10. If the value is greater than or equal 15 then task “greater than 15” should be executed and if the value is less than or task “less than 15” should be executed.

Step 1 – Create a variable

Create a variable v_val with a value of 15

Step 2 – Create the DAG

Let’s create a python file named – hello_branching.py

# Filename: hello_branching.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import datetime, timedelta
from airflow.models import Variable

# Step 1 - define the default parameters for the DAG
default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime(2019, 7, 20),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),

}

p_val=Variable.get('v_val') #Variable passed to registered method

# Step 2 - Create a DAG object
dag = DAG(  'hello_branching',
        schedule_interval='0 0 * * *' ,
        default_args=default_args
    )

# Step 3 - Define the method to check the condition for branching
def my_check_condition(**kwargs):
  if int(p_val)>=15 :
    return 'greater_Than_equal_to_15'
  else:
    return 'less_Than_15'

# Step 4 - Create a Branching task to Register the method in step 3 to the branching API
checkTask = BranchPythonOperator(
  task_id='check_task',
  python_callable=my_check_condition, #Registered method
  provide_context=True,
  dag=dag
)

# Step 5 - Create tasks
greaterThan15 = BashOperator(
  task_id= 'greater_Than_equal_to_15',
  bash_command="echo value is greater than or equal to 15",
  dag=dag
)

lessThan15 = BashOperator(
  task_id= 'less_Than_15',
  bash_command="echo value is less than 15",
  dag=dag
)

# Step 6 - Define the sequence of tasks.
greaterThan15.set_upstream(checkTask)
lessThan15.set_upstream(checkTask)

Let’s analyse the code step by step

  • Step 1 – Define all the default variables and the p_val variable we want to run the checks in our function
  • Step 2 – Creates a DAG object
  • Step 3 – Defines the check method
  • Step 4 – Create a task for branching and register the call back function in it.
  • Step 5 – Create the remaining tasks
  • Step 6 – Set the ordering of the tasks

Step 3 – Execute the DAG

When the code is executed with the variable v_val set to 15. It runs with the following dag

The output log of check_task is shown below with the relevant part highlighted. Click on the image to enlarge

If the value of the v_val is set to 10(which is less than 15). The output is shown below

The output log of check_task is shown below with the relevant part highlighted. Click on the image to enlarge

Summary

There can be various ways the check condition can be implemented – including reading databases or file systems and so on. Once the callback function is ready it is then only a case of registering as a Task.

Next time we look at how to join two airflow branches. This brings us to the end of the post. I hope you find this post useful. Till next time ….byeeee!

4 thoughts on “Airflow – Branching”

Leave a Comment