Introduction
In many use cases, there is a requirement of having different branches(see blog entry) in a workflow. However, these branches can also join together and execute a common task. Something similar to the pic below

There is no special operator which is used but only the way we assign & set upstream and downstream relationships between the tasks. An example of this is shown below.
Branch Join DAG
The hello_joins example extends a DAG from the previous blog entry. An additional task is added to the dag which has been quite imaginatively named join_task.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import datetime, timedelta
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule
# Step 1 - define the default parameters for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 7, 20),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
p_val=Variable.get('v_val') #Variable passed to registered method
# Step 2 - Create a DAG object
dag = DAG( 'hello_joins',
schedule_interval='0 0 * * *' ,
default_args=default_args
)
# Step 3 - Define the method to check the condition for branching
def my_check_condition(**kwargs):
if int(p_val)>=15 :
return 'greater_Than_equal_to_15'
else:
return 'less_Than_15'
# Step 4 - Create a Branching task to Register the method in step 3 to the branching API
checkTask = BranchPythonOperator(
task_id='check_task',
python_callable=my_check_condition, #Registered method
provide_context=True,
dag=dag
)
# Step 5 - Create tasks
greaterThan15 = BashOperator(
task_id= 'greater_Than_equal_to_15',
bash_command="echo value is greater than or equal to 15",
dag=dag
)
lessThan15 = BashOperator(
task_id= 'less_Than_15',
bash_command="echo value is less than 15",
dag=dag
)
joinTask = BashOperator(
task_id= 'join_task',
bash_command="echo This is a join",
trigger_rule=TriggerRule.ONE_SUCCESS,
dag=dag
)
# Step 6 - Define the sequence of tasks.
lessThan15.set_upstream(checkTask)
greaterThan15.set_upstream(checkTask)
joinTask.set_upstream([lessThan15, greaterThan15])
Understand DAG
Let’s look at the changes introduced in the DAG.
- Step 5 – A new task called join_task was added. Observe the TriggerRule which has been added. It is set to ONE_SUCCESS which means that if any one of the preceding tasks has been successful join_task should be executed.
- Step 6 – Adds the dependency to the join_task – as to when it should be executed.
Execute DAG
Let’s execute hello_joins DAG and see the output

Observe how the branch less_Than_15 was skipped and the common task – join_task is executed after the execution of greater_Than_equal_to_15 task. Let’s change the value of the variable v_val to 10 and see the execution of DAG

This time the branch less_Than_15 is executed and after that the common task – join_task is executed again.
Hope this gives you a good idea of how the Airflow branch joins work. In the next entry, we would see how to create Airflow Sub-DAGs.
Till next time …..byeeeeee!!!