Airflow – XCOM

Introduction

Airflow XCom is used for inter-task communications. Sounds a bit complex but it is really very simple. Its implementation inside airflow is very simple and it can be used in a very easy way and needless to say it has numerous use cases.

Inter-task communication is achieved by passing key-value pairs between tasks. Tasks can run on any airflow worker and need not run on the same worker. To pass information a task pushes a key-value pair. The key-value pair is then pulled by another task and utilized.

This blog entry requires some knowledge of airflow. If you are just starting out. I would suggest you first get familiar with airflow. You can try this link

This blog entry is divided into

  • Pushing values to XCOM
  • Viewing XCOM values in Airflow UI
  • Pulling XCOM values

Pushing values to XCOM

Before we dive headlong into XCOM, let’s see where to find them. XCOM can be accessed via the Airflow Web UI->Admin->XComs

Let’s look at a DAG below. Its sole purpose in life is to push a key-value pair using XCOM. It will also help us discover some of the mechanics behind the concept.

# Filename: hello_xcom.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# Step 1 - Define a method for pushing a key value pair to XCOM
def f_send_value_via_xcom(**context):
    print 'Hello xcom push'
    context['ti'].xcom_push(key='my_key', value='my_value')

# Step 2 - Default arguments for airflow DAG
default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime(2020, 1, 25),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
}

# Step 3 - Create an airflow DAG
dag = DAG(  'hello_xcom',
        schedule_interval='0 0 * * *' ,
        default_args=default_args
    )


# Step 4 - Create a Linux command string
linux_command = "echo xcom"

# Step 5 - Create a task for running the Linux command
t1 = BashOperator(
    task_id='Hello',
    bash_command=linux_command,
    dag=dag)

# Step 6 - Create a task for pushing key-value pair to xcom, created in step1
t2 = PythonOperator(
    task_id='task_xcom_push',
    python_callable=f_send_value_via_xcom,
    provide_context=True,
    dag=dag
)

# Step 7 -  Task execution order
t1>>t2

Let’s analyse the two most important steps.

  • Step 1 – Define a python method which pushes a key-value pair to XCOM. Its second line utilizes a context. It refers to a task instance and calls an xcom_push method. It pushes my_key and my_value to XCOM.
  • Step 6 – Is a python operator and also sets the python context to True which provides us with the necessary context in step1.

Viewing XCOM values in Airflow UI

Now let’s run the task. Once done goto Admin->XComs and you should see the following below.

Let’s look at where airflow stores all the data shown above. It is stored in the airflow metastore. Imaginatively, this table is called xcom. See below.

Pulling XCOM values

Now let’s take a step further and add a step for pulling XCOM values. This is achieved by using an xcom_pull method. The airflow DAG below has a task which pushes a key-value pair via XCOM and then another task which pulls the key-value pair. See the code below.

# Filename: hello_xcom2.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# Step 1 - Define a method for pushing a key value pair to XCOM
def f_send_value_via_xcom(**context):
    print 'Hello xcom push'
    context['ti'].xcom_push(key='my_key', value='my_value')

# Step 2 - Define a method for pushing a key value pair to XCOM
def f_get_value_via_xcom(**context):
    print 'Hello xcom pull'
    my_v = context['ti'].xcom_pull(dag_id='hello_xcom2', key='my_key')
    print 'The value from previous task is ',my_v

# Step 3 - Default arguments for airflow DAG
default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime(2020, 1, 25),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
}

# Step 4 - Create an airflow DAG
dag = DAG(  'hello_xcom2',
        schedule_interval='0 0 * * *' ,
        default_args=default_args
    )


# Step 5 - Create a Linux command string
linux_command = "echo xcom"

# Step 6 - Create a task for running the Linux command
t1 = BashOperator(
    task_id='Hello',
    bash_command=linux_command,
    dag=dag)

# Step 7 - Create a task for pushing key-value pair to XCOM, created in step1
t2 = PythonOperator(
    task_id='task_xcom_push',
    python_callable=f_send_value_via_xcom,
    provide_context=True,
    dag=dag
)

# Step 8 - Create a task for pushing key-value pair via XCOM
t3 = PythonOperator(
    task_id='task_xcom_pull',
    python_callable=f_get_value_via_xcom,
    provide_context=True,
    dag=dag
)

# Step 9 -  Task execution order
t1>>t2>>t3

The above DAG only adds an additional step to one before. Let’s analyse the code above

  • Step 2 – A new method f_get_value_via_xcom is defined which uses xcom_pull to get values from XCOM.
  • Step 8 – Defines a new task t3 which uses python operator. It calls the method defined in step-2
Note: This becomes more interesting when you add the following dimensions 
- Pull values between dags, tasks and values from previous runs.
- Independent from where the tasks are being executed. 

When the dag is executed – In Admin->XComs the pushed key value pair is visible

DAG execution log shows the value pulled via XCOM. See below

Make sure you have a look at the xcom_pull method in detail. It has some interesting parameters. They are very useful. Needless to say, if you feel you want to dig more. I suggest you head over the link below

It is the link to the source code of XCOM methods but also exposes some really nice methods which can come handy for some use-cases.

This brings us to end of Airflow XCOM. Hope this blog gets you started on XCOMs and exploring them. If you like the blog entry do leave a comment and tell us what we can write about. Till next time….byeee!!!

1 thought on “Airflow – XCOM”

Leave a Comment