Airflow – Scale-out with Redis and Celery

Introduction

This post uses Redis and celery to scale-out airflow. Redis is a simple caching server and scales out quite well. It can be made resilient by deploying it as a cluster.

In my previous post, the airflow scale-out was done using celery with rabbitmq as the message broker. On the whole, I found the idea of maintaining a rabbitmq a bit fiddly unless you happen to be an expert in rabbitmq.

Redis seems to be a better solution when compared to rabbitmq. On the whole, it is a lot easier to deploy and maintain when compared with the various steps taken to deploy a RabbitMQ broker. In a nutshell, I like it more than RabbitMQ!

To create an infrastructure like this we need to do the following steps

  1. Install & Configure Redis server on a separate host – 1 server
  2. Install & Configure Airflow with Redis and Celery Executor support – 4 servers
  3. Start airflow scheduler & webserver
  4. Start airflow workers
  5. Start flower UI
  6. Run a DAG
  7. Watch it all happen!

Install & Configure Redis Server

We will install Redis-server on a separate machine and for the purposes of this blog entry, I have created it on an AWS EC2 instance( RedHat 8) on which Redis is installed.

Note: For production – It is recommended you set up a redis in a cluster just to make things redundant and eliminate single points of failure.

Download the source code from the following link

Execute the following command to install the required utilities to build the source code

sudo yum install make tcl-devel gcc

Extract the redis source code using the following command

tar -xvf redis-stable.tar.gz

Navigate to the redis folder

Navigate to redis-stable directory. Execute the following commands one by one

make
sudo make install

Job done! – Redis is now ready to be run. Execute the following command

redis-server

Install & Configure Airflow with Redis and Celery Executor

This part needs to be performed for all the Airflow servers exactly the same way. So have as many airflow servers just make sure all of them have the same airflow.cfg! All the airflow configuration for all the components is stored in the same airflow.cfg.

Step-2a – Install Airflow with Redis and Celery Support

Execute the following on your EC2 Instance

sudo yum install gcc python2-devel
sudo pip install setuptools -U
sudo pip install apache-airflow[celery,redis,s3,postgres,crypto,jdbc]==1.10.4
sudo pip install psycopg2
sudo pip install kombu==4.5.0
sudo pip uninstall marshmallow-sqlalchemy
sudo pip install marshmallow-sqlalchemy==0.17.1

Step-2b – Configure Airflow – Set Executor

Set the executor in airflow.cfg to CeleryExecutor

Step-2c – Configure Airflow – Metadata DB

The hard part is now out of the way and all the configuration is now done in airflow.cfg. Provide database details – see an example below

Step-2d – Configure Airflow – Celery configuration

Scroll down the airflow.cfg and there is a section called celery do the following modifications

Set the Celery broker URL to point to redis-server as below

Set the Celery Result Backend DB – this is the same database which airflow uses. Celery sends updates on airflow tasks. Note the way database is configured – slightly different from the way mentioned earlier in step2c

The hard part is now out of the way and all the configuration is now done in airflow.cfg.

Start airflow scheduler & webserver

Start airflow webserver using the following command

airflow webserver

Start the airflow scheduler using the following command

airflow scheduler -p

Start airflow workers

The idea here is that the airflow worker should be executing tasks sent to it by the airflow scheduler. Also, each worker is associated with a queue. The name of the queue is given when the worker is started. See below

airflow worker -q cloudwalker_q1

We are in business our environment is up and running and now we can start are remaining worker nodes. Start the remaining two worker nodes with the following command. Essentially we want to have two worker nodes servicing the queue cloudwalker_q2 so that just in case one worker node goes down airflow will still be able to service the queue.airflow worker -q cloudwalker_q2

IMPORTANT: It is possible to serve a single queue with multiple workers. This introduces resilience in workflow orchestration. If one of the airflow workers becomes unavailable the tasks are then executed by the other workers servicing the queue

Start Flower UI

airflow flower

This will start the flower UI for monitoring all the tasks. You can access the flower UI using the following URL – http://<server-name>:5555

Start a DAG

Now let’s execute a DAG. Because we are now using workers and queues. In code for DAGs additional information needs to be added for queues. This is done on the task level. See Below

# Filename: hello_world2.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime(2019, 9, 28),
  'email': ['[email protected]'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG(  'hello_world2',
        schedule_interval='0 0 * * *' ,
                catchup=False,
        default_args=default_args
    )
create_command = 'echo *************************************$(hostname)*************************************'

t1 = BashOperator(
  task_id='task_for_q1',
  bash_command=create_command,
  queue='cloudwalker_q1',
  dag=dag
)

t2 = BashOperator(
  task_id= 'task_for_q2',
  bash_command=create_command,
  queue='cloudwalker_q2',
  dag=dag
)

t2.set_upstream(t1)

Observe that in the code we have added an additional parameter called queue when calling the BashOperator.

Watch it happen!

Step-7a Goto the Airflow UI & Execute the task

Step-7b Check the Dag

Step-7c Check the logs – task_for_q1

Step-7c Check the logs – task_for_q2

You now have a working airflow which is scaled out with celery and redis. This brings us to the end of this blog post. If you have made it till here – well done!. Hope this blog entry is useful!

10 thoughts on “Airflow – Scale-out with Redis and Celery”

  1. awesome guide, I only wonder whether we need to start webserver and scheduler on the worker nodes as well. Wouldn’t it be enough to only start “airflow worker -q wueue_name” on the worker node? the rest should be done by the Redis Broker and Celery.

    Reply
  2. awesome guide, I only wonder whether we need to start webserver and scheduler on the worker nodes as well. Wouldn’t it be enough to only start “airflow worker -q queue_name” on the worker node? the rest should be done by the Redis Broker and Celery.

    Reply
  3. I have tried everything , it works well. But i’m getting a lot of latency time between the tasks. I tried to configure the max_threads in airflow.cfg, though even it is still slow. I want airflow to execute the second task immediately after the completion of first task. is there any possibility for that

    Reply
    • Hi Krish, Thanks for your comment.

      A lot of latency may have to do with the infra on which airflow is deployed. max_threads may not be helpful if you have limited cpu resources.

      Regards
      V

      Reply
  4. Hi Team,

    I am using Airflow1.10.12 with redis, celery , postgres. with multiple worker nodes., Task is success if i run the worker node in the webserver , if i run the task in different queue (different worker node). the task is failing. In the postgres DB i am not able to see hostname of the queue. if i run the task in the same queue where master (webserver) runs i am able to see the hostname

    Reply
  5. Hello friends,

    Small help here. I have Redis service (in AWS) instead of installing Redis on a separate EC2 image.

    How to set broker URL? I tried in the same way what you mentioned in this guide. But it is not connecting.

    Can you please help me? Thanks in advance.

    Reply

Leave a Comment