Introduction
Airflow Queues and workers are required if there is a need to make the airflow infrastructure more flexible and resilient. This blog entry describes how to install/setup/configure additional components so that we can use airflow in a more flexible and resilient manner.
Till now we have used a local executor to execute all our jobs. Which works fine if we have a small number of jobs and they are not running when another job is running. However, in the real world, this is always the case. Additionally, we also need to take care of the possibilities of an airflow local executor becoming unavailable.
Airflow queues are like any other queues and use a messaging system – like RabbitMQ, ActiveMQ. Airflow scheduler sends tasks as messages to the queues and hence acts as a publisher. Airflow workers are configured to listen for events(i.e tasks) coming on particular queues and execute those tasks. Pretty cool – Have a look at the diagram below.

To create an infrastructure like this we need to do the following steps
- Install & Configure RabbitMQ on a separate host – 1 server
- Install & Configure Airflow with RabbitMQ and Celery Executor support – 4 servers
- Start airflow scheduler & webserver
- Start airflow workers
- Start flower UI
- Run a DAG
- Watch it all happen!
Install & Configure RabbitMQ
This step is probably the most tricky part and can be broken into four smaller steps.
Note: Only do this step if you donot have a RabbitMQ setup. If you already you can skip this step.
Step-1a – Install RabbitMQ
We will install RabbitMQ on a separate machine and for the purposes of this blog entry, I have created it on an AWS EC2 instance( RedHat 8) on which RabbitMQ is installed.
Download the following
- Download Erlang rpm from this link onto your EC2 instance
- Download the RabbitMQ rpm from this link onto your EC2 instance
Execute the following commands
sudo yum install logrotate
sudo yum install erlang-20.1.7-1.el6.x86_64.rpm
sudo yum install rabbitmq-server-3.7.0-1.el6.noarch.rpm


Step-1b – Enable and Start RabbitMQ
Use the following commands to enable and start RabbitMQ server
sudo systemctl start rabbitmq-server

Step-1c – Configure RabbitMQ
There are 4 commands to complete our config
- Create a user – rabbitmqctl <username> <password>
- Create a vHost –
- Add user tags
- Set privileges
sudo rabbitmqctl add_vhost airflow_mq_host
sudo rabbitmqctl set_user_tags airflow_mq administrator
sudo rabbitmqctl set_permissions -p airflow_mq_host airflow_mq ".*" ".*" ".*"

Step-1d Enable RabbitMQ management UI
Even though we have enabled RabbitMQ we need something to see how it looks and sure enough, there is a management plugin we can enable.
Use the following commands
sudo rabbitmq-plugins enable rabbitmq_management

Navigate to /etc/rabbitmq and create a file rabbitmq.config
Now you can open the management interface with the following
http://<<server-name>>:15672 and use airflow_mq for username and password.


Install & Configure Airflow with RabbitMQ and Celery Executor
This part needs to be performed for all the Airflow servers exactly the same way. So have as many airflow servers just make sure all of them have the same airflow.cfg! All the airflow configuration for all the components is stored in the same airflow.cfg.
Step-2a – Install Airflow with RabbitMQ and Celery Support
Execute the following on your EC2 Instance
sudo pip install psycopg2
Note: Airflow(1.10.x) applies certain libraries as part of celery optimisations for RabbitMQ but they usually get in way of airflow normal working. So uninstall those optimisations(for now) these may be fixed in the subsequent airflow versions.
Additionally, you also need to downgrade sqlalchemy if you are using 1.10.5+. It is sad that the dependencies of airflow are a bit all over the place but is all.
sudo pip install marshmallow-sqlalchemy==0.17.1
Step-2b – Configure Airflow – Set Executor
Set the executor in airflow.cfg to CeleryExecutor

Step-2c – Configure Airflow – Metadata DB
The hard part is now out of the way and all the configuration is now done in airflow.cfg. Provide database details – see an example below

Step-2d – Configure Airflow – Celery configuration
Scroll down the airflow.cfg and there is a section called celery do the following modifications
Set the Celery broker URL to point to RabbitMQ server as below

Set the Celery Result Backend DB – this is the same database which airflow uses. Celery sends updates on airflow tasks. Note the way database is configured – slightly different from the way mentioned earlier in step2c

All done with the configuration! – Phew
Step-3 – Start Airflow Scheduler & Webserver
These can be started as below
Observe the webserver detects that it is using CeleryExecutor

While starting the scheduler for use with celery an additional startup parameter is added to the startup command. See below

Step-4 – Start Airflow Worker(s)
The idea here is that the airflow worker should be executing tasks sent to it by the airflow scheduler. Also each worker is associated with a queue. The name of the queue is given when the worker is started. See below

We are in business our environment is up and running and now we can start are remaining worker nodes. Start the remaining two worker nodes with the following command. Essentially we want to have two worker nodes servicing the queue cloudwalker_q2 so that just in case one worker node goes down airflow will still be able to service the queue.
IMPORTANT: It is possible to server a single queue with multiple workers. This introduces resilience in workflow orchestration. If one of the airflow workers becomes unavailable the tasks are then executed by the other workers servicing the queue
Step-5 Start Flower UI

This will start the flower UI for monitoring all the tasks. You can access the flower UI using the following URL – http://<server-name>:5555

Step-6 Start a DAG
Now let’s execute a DAG. Because we are now using workers and queues. In code for DAGs additional information needs to be added for queues. This is done on the task level. See Below
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 9, 9),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG( 'hello_world2',
schedule_interval='0 0 * * *' ,
catchup=False,
default_args=default_args
)
create_command = 'echo *************************************$(hostname)*************************************'
t1 = BashOperator(
task_id='task_for_q1',
bash_command=create_command,
queue='cloudwalker_q1',
dag=dag
)
t2 = BashOperator(
task_id= 'task_for_q2',
bash_command=create_command,
queue='cloudwalker_q2',
dag=dag
)
t2.set_upstream(t1)
Observe that in the code we have added an additional parameter called queue when calling the BashOperator.
Step-7 Watch it happen!
Step-7a Goto the Airflow UI & Execute the task

Step-7b Check the Dag

Step-7c Check the logs – task_for_q1

Step-7d Check the logs – task_for_q2

We have now come to the end of this blog. If you have made it till here – well done!. Hope this blog entry is useful
PLESE SHARE YOUR CONTACT NUMBER WE FACING amqp ISSUE