Airflow – Sensors

Estimated reading time: 9 minutes

Airflow sensors are like operators but perform a special task in an airflow DAG. They check for a particular condition at regular intervals and when it is met they pass to control downstream tasks in a DAG. There are various types of sensors and in this mini blog series, we intend to explore.

Before you begin to read further. In case, you are beginning to learn airflow – Do have a look at these blog posts

Introduction

The fastest way to learn how to use an airflow sensor is to look at an example. In this blog post, we will be looking at an example using S3KeySensor for reading a file as soon as they arrive in S3. There are other sensors that are available as well. Some of them are

  • S3 Key Sensor
  • SQL Sesnsor
  • HTTP Sensor
  • HDFS Sensor
  • Hive Sensor …. and many more.

There are a lot of sensors that are available. You can see the list of all pre-built sensors available on this link. There are additional contributions available that are also available and provide some really interesting sensors, these are available on this link. Have a look at these and be pleasantly surprised. Oh BTW if none of these suit your needs – well you can build one as well. The base class is available on this link.

Sensor Concepts

All sensors provide a specific check, which they perform. However, they also have a lot of features that are common. As you would have guessed by the fact that there is a base class for sensors. Let us look at them one by one before looking at specific sensors. I would suggest you also have a look at the base operator. It has four properties which are of interest to us.

  • Mode – There are two types of modes.
    • Poke – Poke is a default.
    • Reschedule

They both perform the same function but there is a difference in the way airflow allocates resources.

When set to 

poke
 the sensor is taking up a worker slot for its whole execution time and sleeps between pokes. Use this mode if the expected runtime of the sensor is short or if a short poke interval is required. Note that the sensor will hold onto a worker slot and a pool slot for the duration of the sensor’s runtime in this mode. When set to 
reschedule
 the sensor task frees the worker slot when the criteria is not yet met and it’s rescheduled at a later time. Use this mode if the time before the criteria is met is expected to be quite long. The poke interval should be more than one minute to prevent too much load on the scheduler.

Airflow Documentation
  • Soft fail – Defines what happens if the sensor fails. If set to False, it allows the sensor to retry and if set to True it allows DAG to mark the task as skipped on failure. If you want to keep the sensor to retry just make sure it is set to False or default ;).
  • Retry number of times the sensor will attempt sensor task.
  • Poke interval – Poke interval is during the sensor waits before it tries again during an attempt. It is set in seconds. Poke interval has a default value of 60 seconds.
  • Time out – This is the duration after which the sensor task attempt will fail. By default, this value is set to 7 days.

IMPORTANT – How does poke interval, retry and timeout work out?

Something which is not apparent from the airflow documentation, unless you actually go thru the code. So here is an example, this is applicable to all the sensors irrespective of the type.

Let’s say we have set the following

  • Number of retries = 2
  • Poke Interval = 60 seconds
  • Timeout = 180 seconds

You will see that Airflow will attempt to execute the sensor task 2 times. In each attempt, It will poke a maximum of 4 times at the start (0 secs) and then at 60secs, 120secs, and 180secs. If you understand this you have pretty much cracked airflow sensors. Rest is all sensor-specific knowledge.

S3 Key Sensor

The S3 Key Sensor as the name suggests checks the availability of files(a.k.a keys) placed in an S3 bucket. The sensor can be set to check every few seconds or minutes for a key. When a DAG is running it will check when the key is available or not. If the key is available then the control will be passed to the next task in the DAG and the flow will continue. If the key is not available it will fail or retry(depending upon the configuration).

To enable S3 Key Sensor – airflow will need to have access to S3. That means to enable what comes next you need to have an S3 connection.

Let’s look at a simple but often used use-case – we want to execute the dag once the file has landed in our S3 bucket. Let’s just say that the DAG needs to take some really complicated steps and the client does not want to use AWS lambdas ;). Our DAG needs to check every 1 min if the file has arrived or not and it should timeout after 180 seconds.

See below is an example of DAG which uses an S3 key sensor to achieve the above use case. We will also see what happens when it gets executed.

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.sensors import S3KeySensor
from airflow.operators.python_operator import PythonOperator
import boto3

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 4, 25),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'max_active_runs': 1,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('cloudwalker_s3_sensor',
          schedule_interval='@daily',
          default_args=default_args,
          catchup=False
          )
s3_bucketname = 'demo2-s3-sensor'
s3_loc = 'demo/mytestfile.txt'

s3_sensor = S3KeySensor(
    task_id='s3_check_if_file_present',
    poke_interval=60,
    timeout=180,
    soft_fail=False,
    retries=2,
    bucket_key=s3_loc,
    bucket_name=s3_bucketname,
    aws_conn_id='customer1_demo_s3',
    dag=dag)


def my_processing_func(**kwargs):
    print("I am reading the file")
    s3 = boto3.client('s3')
    obj = s3.get_object(Bucket=s3_bucketname, Key=s3_loc)
    lines = obj['Body'].read().decode("utf-8")
    print(lines)


some_task = PythonOperator(
    task_id='some_task_using_file_found',
    python_callable=my_processing_func,
    dag=dag)

s3_sensor >> some_task

The above DAG tries to do some pretty simple stuff.

  1. Sensor tries to check(or sense) if a file(also called s3 key) is present on S3 or not.
  2. Task – If the file is detected in S3, execute a simple python function.

Let’s look at logs of two scenarios.

  1. File is present is NOT present in the S3 bucket.
  2. File is present in the S3 bucket

Scenario #1 – File NOT present in S3 bucket

Below is the screenshot of the empty s3 bucket

Empty S3 bucket

The log below shows how airflow does 2 retries each retry lasting 180 seconds. In each attempt, the poke is done every 60 seconds and there are a total of four pokes in each attempt. But because the file is not there it times out and fails.

S3 key sensor log output

Observe how in the above log the poke is done every 60 seconds and finally it timeout saying it cannot find the file on both the attempts.

Scenario #2 – File is present in S3 bucket

Below is the screenshot of the bucket with the file.

S3 bucket with the file
DAG execution success

The log below shows how the file is detected by S3 key sensor.

S3 key sensor output log

Below is the log of the subsequent task which was executed once the file was detected

DAG task output log

There are various types of sensors that are similar to the S3 key sensor. For example

  • HDFS file sensor
  • File Sensor

I am sure if you search airflow documentation you would find sensors for google storage and azure cloud storage as well.

SQL Sensor

Let’s turn our attention to another sensor and that is the SQL sensor. Well, if you are guessing it has to do with databases. The SQL Sensor requires a few more parameters that are specific to the SQL sensor.

  • Connection Id
  • SQL String

See the DAG below. Specifically, it has an SQL sensor task that polls a particular table and if data is returned successfully it will execute the next task in the DAG. So very similar to the S3 key sensor but the trigger technology is now different. The DAG is designed to do the following tasks

  • Check if there is data in a particular table
  • If successful execute the next task which is a python operator.
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.sensors.sql_sensor import SqlSensor
from airflow.operators.python_operator import PythonOperator
import boto3

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 4, 25),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'max_active_runs': 1,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('cloudwalker_sql_sensor',
          schedule_interval='@daily',
          default_args=default_args,
          catchup=False
          )

sql_sensor = SqlSensor(
    task_id='sql_sensor_check',
    poke_interval=60,
    timeout=180,
    soft_fail=False,
    retries=2,
    sql="select count(*) from public.emp",
    conn_id='my_db_conn',
    dag=dag)


def my_processing_func(**kwargs):
    print("******* Execution *********")
    print("I am doing some processing")


some_task = PythonOperator(
    task_id='some_task',
    python_callable=my_processing_func,
    dag=dag)

sql_sensor >> some_task

To execute this code you would need to create a connection in airflow. You can learn more about it at this link.

Let’s see the output of the DAG when the query returns successfully.

DAG with SQL sensor

The SQL sensor sees that some data has been returned. See below

SQL sensor log

Once the sensor has triggered successfully the subsequent tasks can be executed.

Task execution log

There are many sensors and it would virtually be impossible to cover all of them. Hopefully, this blog entry has given you all the insight to get you started on airflow sensors. If you like the blog entry do share it – till next time byeeeeee!!!!

1 thought on “Airflow – Sensors”

Leave a Comment