Airflow – External Task Sensor

Airflow External Task Sensor deserves a separate blog entry. It is a really powerful feature in airflow and can help you sort out dependencies for many use-cases – a must-have tool. This blog entry introduces the external task sensors and how they can be quickly implemented in your ecosystem.

Before you dive into this post, if this is the first time you are reading about sensors I would recommend you read the following entry

Why use External Task Sensor

Here is my thought as to why an external task sensor is very useful. Most traditional scheduling is time-based. This means that the dependencies between jobs are base on an assumption that the first job will definitely finish before the next job starts. But what happens if the first job fails or is processing more data than usual and may be delayed? Well, we have what is called a “data pipeline failure”(data engineering lingo 😉 ) because the next task is time-dependent and would be triggered even when the first job has failed or not finished. This becomes more accentuated when data pipelines are becoming more and more complex. Leading to a massive waste of human and infrastructure resources.

A better solution would have been that the dependent job should have started only when it exactly knows the first job has finished. Some sort of event to trigger the next job. This is where the external task sensor can be helpful.

External Task Sensor Concepts

In a nutshell, the external task sensor simply checks on the state of the task instance which is in a different DAG or in airflow lingo external task. if the state is what you want to sense the dag with the external sensors simply goes ahead and executes the task(s) which come next. You could also read more about external task sensors here.

Let’s look at it in a little more detail. If the use case is to detect if the task in DAG A has been successfully executed or not. A solution using an external task sensor would be to create a DAG B with an external task sensor that would detect the success state for the task in DAG A. No changes are required in DAG A, which I think is quite helpful. The default task instance state to check in the external task sensor is success state but you can easily check the failure or other states as well. The list of possible task instances states in Airflow 1.10.15 is below

scheduledskippedupstream_failedup_for_reschedule up_for_retry
failedsuccessrunningqueuedno_status
List of valid airflow task instance states

Concepts of how the sensors work remain the same.

External Task Sensor Parameters

There are six parameters for the external task sensor. These are also documented here.

Parameter NameTypeRequired/
Optional
Parameter Description
external_dag_idStringRequiredThe DAG Id of the DAG, which has the task which needs to be sensed
external_task_id String/NoneOptionalDefaults to None.
The Task Id of the Task(inside the dag) is to be monitored.
If set to default external task sensor will wait for the entire DAG to finish
allowed_states String Optional Task state which needs to be sensed. Default is “success“.
execution_delta time delta object Optional Time difference with the previous execution to look at, the default is the same execution_date as the current task or DAG. (This is discussed in more detail below)
execution_date_fn callback method Optional A function that receives the current execution date and returns the desired execution dates to query. (This is discussed in more detail below)
check_existence boolean Optional
External task sensor parameters

We are really interested(a lot!!!) in the execution_delta  and execution_date_fn parameters. There are three different scenarios in which an external task sensor can be used. In all the scenarios there are two DAGs

  • Main DAG with the external task sensor
  • DAG which needs to be monitored
Note: You can either use execution_delta or execution_delta_fn at any one time.

Usage Scenarios

Let’s look at the scenarios

  • Scenario#1 – Both DAGs have the same schedule. Start at the same time.
  • Scenario#2 – Both DAGs have the same schedule but start time is different.
  • Scenario#3 – Both DAGs have the same schedule but start time is different and computing the execution date is complex.
Scenarios execution_delta or execution_delta_fn Notes
Scenario#1 – Both DAGs have the same schedule and start at the same time. Simple and Easy. Many drawbacks. Practically difficult to sync DAG timings.
Scenario#2 – Both DAGs have the same schedule but the start time is different. execution_deltaNeed to provide time delta object. Works for most business requirements.
Scenario#3 – Both DAGs have the same schedule but the start time is different and computing the execution date is complex execution_delta_fnSimilar to scenario#2. In addition, very flexible and allows you to create complex logic to compute execution date.
Note: In all the three scenarios we donot modify the dag or the task in the dag which we are trying to sense.

Scenario#1 – Both DAGs have the same schedule. Start at the same time

DAG with a task to monitor

Below is the simple DAG, whose tasks we want to monitor using the external task sensor.

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 8, 29, 16, 00),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'max_active_runs': 1,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('cloudwalker_dag_with_task_to_be_sensed',
          schedule_interval='*/5 * * * *',
          default_args=default_args,
          catchup=False
          )


def my_processing_func(**kwargs):
    print("I am task which does something")


t1 = DummyOperator(
    task_id='some_arbitary_task',
    dag=dag
)


task_to_be_sensed = PythonOperator(
    task_id='task_to_be_sensed',
    python_callable=my_processing_func,
    dag=dag)


task_to_be_sensed >> t1

Now let us look at the DAG which has the external task sensor

DAG with External Task Sensor

Below is the DAG which has the external task sensor. If you look at the start_date parameter in the default arguments parameter, you will notice that both the DAGs share the same start_date and the same schedule.

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 8, 29, 16, 00),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'max_active_runs': 1,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('hello_external_task_sensor',
          schedule_interval='*/5 * * * *',
          default_args=default_args,
          catchup=False
          )


external_task_sensor = ExternalTaskSensor(
    task_id='external_task_sensor',
    poke_interval=60,
    timeout=180,
    soft_fail=False,
    retries=2,
    external_task_id='task_to_be_sensed',
    external_dag_id='cloudwalker_dag_with_task_to_be_sensed',
    dag=dag)


def my_processing_func(**kwargs):
    print("I have sensed the task is complete in a dag")


some_task = PythonOperator(
    task_id='some_task',
    python_callable=my_processing_func,
    dag=dag)

external_task_sensor >> some_task

Let’s look at the screenshots from airflow for what happens

Output from DAG which had the task to be sensed is below

Log from the task_to_be_sensed is below

Log from the external task sensor is below

Scenario#2 – Both DAGs have the same start date, same execution frequency but different trigger times.

This scenario is probably, the most used, in this scenario, Both DAGs have the same start date, same execution frequency but different trigger times. For example, both the jobs may run daily, one starts at 9 AM and the other at 10 AM. For this blog entry, we are going to keep them 3 mins apart. Everything else remains the same.

So the start_date in the default arguments remains the same in both the dags, however the schedule_interval parameter changes. In addition to it we add a parameter in the external task sensor definition – execution_delta, this is used to compute the last successful execution date for the task which is being sensed by the external task sensor.

DAG with a task to monitor

Below is the simple DAG, whose tasks we want to monitor using the external task sensor.

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 25, 8, 55),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'max_active_runs': 1,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('cloudwalker_dag_with_task_to_be_sensed',
          schedule_interval='0 9 * * *',
          default_args=default_args,
          catchup=False
          )


def my_processing_func(**kwargs):
    print("I am task which does something")


t1 = DummyOperator(
    task_id='some_arbitary_task',
    dag=dag
)


task_to_be_sensed = PythonOperator(
    task_id='task_to_be_sensed',
    python_callable=my_processing_func,
    dag=dag)


task_to_be_sensed >> t1

DAG with External Task Sensor

Now let us look at the DAG which has the external task sensor.

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 25, 8, 55),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'max_active_runs': 1,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('hello_external_task_sensor',
          schedule_interval='03 9 * * *',
          default_args=default_args,
          catchup=False
          )


external_task_sensor = ExternalTaskSensor(
    task_id='external_task_sensor',
    poke_interval=60,
    timeout=180,
    soft_fail=False,
    retries=2,
    external_task_id='task_to_be_sensed',
    external_dag_id='cloudwalker_dag_with_task_to_be_sensed',
    execution_delta=timedelta(minutes=3),
    dag=dag)


def my_processing_func(**kwargs):
    print("I have sensed the task is complete in a dag")


some_task = PythonOperator(
    task_id='some_task',
    python_callable=my_processing_func,
    dag=dag)

external_task_sensor >> some_task

Now once you deploy your DAGs – let’s look at the screenshots from Airflow

Output from DAG which had the task to be sensed is below

Log from the task_to_be_sensed is below

Now let’s look at the task from the external task sensor

Scenario#3 – Computing the execution date using complex logic

Finally, let’s look at the last scenario where you have complete flexibility to compute the execution date for the task to be sensed. To meet this requirement, instead of passing the time delta to compute the execution date, we pass a function that can be used to apply a computation logic and returns the execution date to the external task sensor.

An example can be looking for an execution date of a task that has been executed any time during the last 24hrs or has been executed twice and the latest execution date is required or any other complex requirement.

For this blog entry, we will try and implement a simple function that emulates execution delta functionality but using a function call instead.

DAG with a task to monitor

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 25, 16, 25),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'max_active_runs': 1,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('cloudwalker_dag_with_task_to_be_sensed',
          schedule_interval='25 16 * * *',
          default_args=default_args,
          catchup=False
          )


def my_processing_func(**kwargs):
    print("I am task which does something")


t1 = DummyOperator(
    task_id='some_arbitary_task',
    dag=dag
)


task_to_be_sensed = PythonOperator(
    task_id='task_to_be_sensed',
    python_callable=my_processing_func,
    dag=dag)


task_to_be_sensed >> t1

DAG with External Task Sensor

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 25, 16, 25),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'max_active_runs': 1,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('hello_external_task_sensor',
          schedule_interval='28 16 * * *',
          default_args=default_args,
          catchup=False
          )


external_task_sensor = ExternalTaskSensor(
    task_id='external_task_sensor',
    poke_interval=60,
    timeout=180,
    soft_fail=False,
    retries=2,
    external_task_id='task_to_be_sensed',
    external_dag_id='cloudwalker_dag_with_task_to_be_sensed',
    execution_date_fn=lambda dt: dt - timedelta(minutes=3),
    dag=dag)


def my_processing_func(**kwargs):
    print("I have sensed the task is complete in a dag")


some_task = PythonOperator(
    task_id='some_task',
    python_callable=my_processing_func,
    dag=dag)

external_task_sensor >> some_task

Now once you deploy your DAGs – let’s look at the screenshots from Airflow

Output from DAG which had the task to be sensed is below

Log from the task_to_be_sensed is below

Now let’s look at the task from the external task sensor

Wow, this brings us to the end of this very very long post. I sincerely hope this post will help you in your work with airflow. If you like this post please do share it. Till next time …. bye!

Leave a Comment