Airflow – Scale out with RabbitMQ and Celery

Introduction

Airflow Queues and workers are required if there is a need to make the airflow infrastructure more flexible and resilient. This blog entry describes how to install/setup/configure additional components so that we can use airflow in a more flexible and resilient manner.

Till now we have used a local executor to execute all our jobs. Which works fine if we have a small number of jobs and they are not running when another job is running. However, in the real world, this is always the case. Additionally, we also need to take care of the possibilities of an airflow local executor becoming unavailable.

Airflow queues are like any other queues and use a messaging system – like RabbitMQ, ActiveMQ. Airflow scheduler sends tasks as messages to the queues and hence acts as a publisher. Airflow workers are configured to listen for events(i.e tasks) coming on particular queues and execute those tasks. Pretty cool – Have a look at the diagram below.

To create an infrastructure like this we need to do the following steps

  1. Install & Configure RabbitMQ on a separate host – 1 server
  2. Install & Configure Airflow with RabbitMQ and Celery Executor support – 4 servers
  3. Start airflow scheduler & webserver
  4. Start airflow workers
  5. Start flower UI
  6. Run a DAG
  7. Watch it all happen!

Step-1 Install & Configure RabbitMQ

This step is probably the most tricky part and can be broken into four smaller steps.

Note: Only do this step if you donot have a RabbitMQ setup. If you already you can skip this step.

Step-1a – Install RabbitMQ

We will install RabbitMQ on a separate machine and for the purposes of this blog entry, I have created it on an AWS EC2 instance( RedHat 8) on which RabbitMQ is installed.

Download the following

  • Download Erlang rpm from this link onto your EC2 instance
  • Download the RabbitMQ rpm from this link onto your EC2 instance

Execute the following commands

sudo yum install socat
sudo yum install logrotate
sudo yum install erlang-20.1.7-1.el6.x86_64.rpm
sudo yum install rabbitmq-server-3.7.0-1.el6.noarch.rpm

Step-1b – Enable and Start RabbitMQ

Use the following commands to enable and start RabbitMQ server

sudo chkconfig rabbitmq-server on
sudo systemctl start rabbitmq-server

Step-1c – Configure RabbitMQ

There are 4 commands to complete our config

  • Create a user – rabbitmqctl <username> <password>
  • Create a vHost –
  • Add user tags
  • Set privileges
sudo rabbitmqctl add_user airflow_mq airflow_mq
sudo rabbitmqctl add_vhost airflow_mq_host
sudo rabbitmqctl set_user_tags airflow_mq administrator
sudo rabbitmqctl set_permissions -p airflow_mq_host airflow_mq ".*" ".*" ".*"

Step-1d Enable RabbitMQ management UI

Even though we have enabled RabbitMQ we need something to see how it looks and sure enough, there is a management plugin we can enable.

Use the following commands

cd /usr/lib/rabbitmq/bin
sudo rabbitmq-plugins enable rabbitmq_management

Navigate to /etc/rabbitmq and create a file rabbitmq.config

[{rabbit, [{tcp_listeners, [{"", 5672}]},{loopback_users, []}]}].

Now you can open the management interface with the following

http://<<server-name>>:15672 and use airflow_mq for username and password.

Step-2 Install & Configure Airflow with RabbitMQ and Celery Executor

This part needs to be performed for all the Airflow servers exactly the same way. So have as many airflow servers just make sure all of them have the same airflow.cfg! All the airflow configuration for all the components is stored in the same airflow.cfg.

Step-2a – Install Airflow with RabbitMQ and Celery Support

Execute the following on your EC2 Instance

sudo pip install apache-airflow[celery,rabbitmq,s3,postgres,crypto,jdbc]
sudo pip install psycopg2

Note: Airflow(1.10.x) applies certain libraries as part of celery optimisations for RabbitMQ but they usually get in way of airflow normal working. So uninstall those optimisations(for now) these may be fixed in the subsequent airflow versions.

sudo pip uninstall librabbitmq

Additionally, you also need to downgrade sqlalchemy if you are using 1.10.5+. It is sad that the dependencies of airflow are a bit all over the place but is all.

sudo pip uninstall marshmallow-sqlalchemy
sudo pip install marshmallow-sqlalchemy==0.17.1

Step-2b – Configure Airflow – Set Executor

Set the executor in airflow.cfg to CeleryExecutor

Step-2c – Configure Airflow – Metadata DB

The hard part is now out of the way and all the configuration is now done in airflow.cfg. Provide database details – see an example below

Step-2d – Configure Airflow – Celery configuration

Scroll down the airflow.cfg and there is a section called celery do the following modifications

Set the Celery broker URL to point to RabbitMQ server as below

Set the Celery Result Backend DB – this is the same database which airflow uses. Celery sends updates on airflow tasks. Note the way database is configured – slightly different from the way mentioned earlier in step2c

All done with the configuration! – Phew

Step-3 – Start Airflow Scheduler & Webserver

These can be started as below

airflow webserver

Observe the webserver detects that it is using CeleryExecutor

While starting the scheduler for use with celery an additional startup parameter is added to the startup command. See below

airflow scheduler -p

Step-4 – Start Airflow Worker(s)

The idea here is that the airflow worker should be executing tasks sent to it by the airflow scheduler. Also each worker is associated with a queue. The name of the queue is given when the worker is started. See below

airflow worker -q cloudwalker_q1

We are in business our environment is up and running and now we can start are remaining worker nodes. Start the remaining two worker nodes with the following command. Essentially we want to have two worker nodes servicing the queue cloudwalker_q2 so that just in case one worker node goes down airflow will still be able to service the queue.

airflow worker -q cloudwalker_q2

IMPORTANT: It is possible to server a single queue with multiple workers. This introduces resilience in workflow orchestration. If one of the airflow workers becomes unavailable the tasks are then executed by the other workers servicing the queue

Step-5 Start Flower UI

airflow flower

This will start the flower UI for monitoring all the tasks. You can access the flower UI using the following URL – http://<server-name>:5555

Step-6 Start a DAG

Now let’s execute a DAG. Because we are now using workers and queues. In code for DAGs additional information needs to be added for queues. This is done on the task level. See Below

# Filename: hello_world2.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime(2019, 9, 9),
  'email': ['airflow@example.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG(  'hello_world2',
        schedule_interval='0 0 * * *' ,
                catchup=False,
        default_args=default_args
    )
create_command = 'echo *************************************$(hostname)*************************************'

t1 = BashOperator(
  task_id='task_for_q1',
  bash_command=create_command,
  queue='cloudwalker_q1',
  dag=dag
)

t2 = BashOperator(
  task_id= 'task_for_q2',
  bash_command=create_command,
  queue='cloudwalker_q2',
  dag=dag
)

t2.set_upstream(t1)

Observe that in the code we have added an additional parameter called queue when calling the BashOperator.

Step-7 Watch it happen!

Step-7a Goto the Airflow UI & Execute the task

Step-7b Check the Dag

Step-7c Check the logs – task_for_q1

Step-7d Check the logs – task_for_q2

We have now come to the end of this blog. If you have made it till here – well done!. Hope this blog entry is useful

One comment

Leave a Reply

Your email address will not be published. Required fields are marked *