Introduction
Came across this interesting feature of managing SLA’s natively in airflow. Failures can happen not just by an actual failure of a task/pipeline but may a slow running task/pipeline. A slow running task/pipeline may cause downstream tasks or DAGs which depend upon it to fail. This thought got me searching and I decided to write a post about Airflow and SLAs Management. What I found was a simple solution which can help manage failures and delays.
Before you jump into understanding how to manage SLAs in airflow make sure you are familiar with how to create airflow DAGs .
This post is divided into the following parts
Table of Contents
Time duration is defined on a task. Airflow will monitor the performance of task/DAG when SLAs are enabled. When SLAs are breached airflow can trigger the following
- An email notification
- An action via a callback function call
Note 1: SLAs checks occur only for scheduled tasks only!!! Strange! Note 2: SLAs monitoring is started from the scheduled time when the DAG is to be triggered! Note 3: There can be only one callback function for tasks and/or DAG level
Defining SLAs is done in three simple steps in defining SLAs in Airflow
- Step 1 – Define a callback method
- Step 2 – Pass the callback method to DAG
- Step 3 – Define the SLA duration on task(s)
Define a callback method
Here is an example below of a simple callback function. Keep in mind to keep the same arguments as show.
# Logs to scheduler logs. Not in the scheduler UI.
# Location $AIRFLOW_HOME/logs/scheduler/
logger.info("************************************* SLA missed! ***************************************")
logger.info(args)
Function which prints out a log to the scheduler log. But your method may do a lot more like logging a ticket with ServiceNow or sending a slack notification or something much more complex.
Pass the callback method to DAG object
Here is an example
schedule_interval='* * * * *',
default_args=default_args,
catchup=False,
sla_miss_callback=dag_sla_missed_take_action # callback function
)
Line 5 – The last line is where the reference to the callback method is passed to the DAG object. Nothing fancy at all till now!!
Define SLA on task(s)
Finally, we come to a point where you would like to define the SLA for a task. Keep in mind defining SLAs is optional for a task. So in a DAG with multiple tasks, there may be some tasks with SLAs and some without any SLAs. See the complete DAG definition below
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
def cw_sla_missed_take_action(*args, **kwargs):
# Logs to scheduler logs. Not in the scheduler UI.
# Location $AIRFLOW_HOME/logs/scheduler/
logger.info("************************************* SLA missed! ***************************************")
logger.info(args)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 12, 14),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('hello_sla_tasks',
schedule_interval='* * * * *',
default_args=default_args,
catchup=False,
sla_miss_callback=cw_sla_missed_take_action # callback function
)
next_command = 'echo Awake now!'
t1 = BashOperator(
task_id='sleep_for_10s', # Cause the task to miss the SLA
bash_command='sleep 10',
sla=timedelta(seconds=5), # SLA for the task
dag=dag
)
t2 = BashOperator(
task_id='next_task',
bash_command=next_command,
dag=dag
)
t2.set_upstream(t1)
The first task has a SLA defined but not the next tasks.
When this DAG runs as a scheduled DAG, we will see that the task t1 will be delayed and miss the SLA. It will cause the entry to be created in the scheduler log. Also will generate an email notification to the DAG owner. See below

Additionally, I also got an email notification as I had setup my smtp in airflow.cfg.
Define SLA on DAG level
SLAs on DAG are equally simple to define. They can be attached to a DAG as a part of default arguments. See the DAG below and checkout line 26, last line of default_args. SLA has now been defined on the entire DAG
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
def cw_sla_missed_take_action(*args, **kwargs):
# Logs to scheduler logs. Not in the scheduler UI.
# Location $AIRFLOW_HOME/logs/scheduler/
logger.info("************************************* SLA missed! ***************************************")
logger.info(args)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 12, 14),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(seconds=5)
}
dag = DAG('hello_sla_dag',
schedule_interval='* * * * *',
default_args=default_args,
catchup=False,
sla_miss_callback=cw_sla_missed_take_action # callback function
)
next_command = 'echo Awake now!'
t1 = BashOperator(
task_id='sleep_for_10s', # Cause the task to miss the SLA
bash_command='sleep 10',
dag=dag
)
t2 = BashOperator(
task_id='next_task',
bash_command=next_command,
dag=dag
)
t2.set_upstream(t1)
Below is the screenshot of the scheduler log which is generated by the callback function.

Finally, let’s also look at the notification email which you would see lists the tasks which missed the SLAs.
Airflow UI – SLA Misses
You can also monitor SLA miss events which have happened on the airflow UI by going to Admin->SLA Misses

It will take you to this screen

Conclusion
As you can see from above post it is not really difficult to setup SLA in airflow. It can make your failure management by either failures or delayed jobs a lot more streamlined. If you like this post please do share it and spread the knowledge!
1 thought on “Airflow & SLA Management”