Spark & Redis

One of the fantastic use-cases of Redis is its use along with Apache-Spark in-memory computation engine. You can in some sense use it as a backend to persist spark objects – data frames, datasets or RDDs in the Redis Cache alongside other cached objects. To enable this there is a very handy library available called Spark-Redis. This library has both Scala and Python-based API.

Redis can be used to persist data and be used as a backend – it can be used to share common data between various jobs rather than loading the same data again and again. This makes Redis an invaluable tool for big data developers.

In this blog post, we will use both scala and python based API to read data and write data frames and RDDs to/from Redis.

Using Scala API

In this section, we will read and write to a Redis cluster using Scala and Spark. Below is the build.sbt to be used

name := "RedisSpark"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
libraryDependencies += "com.redislabs" % "spark-redis" % "2.4.0"

Write to Redis using Spark

The Spark program reads a simple CSV file to create a data frame. Add an additional column using spark SQL and finally writes the data frame to Redis cluster.

import org.apache.spark.sql.SparkSession

object SparkApp {
  def main(args: Array[String]): Unit = {
    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .config("spark.redis.host", "localhost")
      .config("spark.redis.port", "6379")
      .appName("Spark Writer")
      .getOrCreate

    //Step 2 - Read data from a local file & add a row number
    val baseDS = spark.read
      .option("header","true")
      .option("delimiter",",")
      .csv("products_v2.csv")

    baseDS.createOrReplaceTempView("base")

    val productDS =  spark.sql("select row_number() over(partition by product order by product desc) as productid, * from base")

    //Step 3 - Write the data to redis cluster
    productDS.write
      .format("org.apache.spark.sql.redis")
      .option("table", "products")
      .save()

    //Step 4 - Stop spark session
    spark.stop()
    System.exit(0)
  }
}

Before we execute this let’s take a look at the Redis server and see if there are any hashes which start with products. See below

keys products*

Let’s run our small Scala/Spark program, which reads a file products_v2.csv. A small portion of the log is below

Let’s look at the Redis server again via the Redis CLI. Using the same command as earlier. It shows 88475 records which match the number of records in our CSV file.

Reading from Redis using Spark

In this section, a separate spark program reads from the Redis cluster and counts the number of rows and displays the results.

import org.apache.spark.sql.SparkSession

object SparkAgg {
  def main(args: Array[String]): Unit = {
    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .config("spark.redis.host", "localhost")
      .config("spark.redis.port", "6379")
      .appName("My Spark Agg")
      .getOrCreate

    //Step 2 - Read products from redis cluster and create a dataframe
    val baseDS = spark.read
      .format("org.apache.spark.sql.redis")
      .option("table","products")
      .load

    //Step 3 - Show the rows
    baseDS.show(10)

    //Step 4 - Count the rows
    println(s"Number of rows:${baseDS.count}")

    //Step 5 - Stop Spark & Exit
    spark.stop()
    System.exit(0)
  }
}

The relevant portions of the output are below

Using Python API

Writing code using python is equally simple but it requires a jar to be built and placed in the java classpath. To do all this the following steps are required.

Creating the JAR

Step 1 – Install maven

This is a simple step. If you are using ubuntu you can fire the following command

sudo apt-get install maven

Step 2 – Download or Clone the Redis-Spark library from Github

The below step shows how to clone using git and gets you the latest repo.

git clone https://github.com/RedisLabs/spark-redis.git

You can also download a specific release version as well if you are after specific releases from this link https://github.com/RedisLabs/spark-redis/releases

Step 3 – Build the jar

Unzip the files in a directory and run the following command

git clone https://github.com/RedisLabs/spark-redis.git

Note – Make sure you have JDK 1.8 installed. Make sure it’s, not a higher version! It should download all the required libraries and build our jar.

Once the jar is built navigate to the target directory and pick the file highlighted above. It has the following file pattern spark-redis-<version>-jar-with-dependencies.jar. This file needs to be in your java classpath or you can copy it to your site-packages/pyspark/jar dir in your virtual env. See the screenshot below.

This jar could also be added as a part of spark-submit.

Write to Redis using Spark

Below is the python code which reads a CSV file and writes the data to the Redis cluster. It adds an additional column using spark SQL and finally writes the data frame to Redis cluster.

from pyspark.sql import SparkSession

# Step 1 - Create a spark session
spark = SparkSession.Builder() \
    .master('local[*]') \
    .config("spark.redis.host", "localhost") \
    .config("spark.redis.port", "6379") \
    .appName("Spark Writer") \
    .getOrCreate()

# Step 2 - Read data from a local file &amp; add a row number
base_ds = spark.read \
    .option("header", "true") \
    .option("delimiter", ",") \
    .csv("products_v2.csv")

base_ds.createOrReplaceTempView("base")

product_ds = spark.sql("select row_number() over(partition by product order by product desc) as productid, * from base")

# Step 3 - Write the data to redis cluster
product_ds.write \
    .format("org.apache.spark.sql.redis") \
    .option("table", "products") \
    .save()

# Step 4 - Stop spark session
spark.stop()

Before the execution, the Redis cluster looks like

This image has an empty alt attribute; its file name is image.png

The output of the python program is below

If we fire the same command again this time we see – It shows 88475 records which match the number of records in our CSV file.

Read from Redis using Spark

In this section, a separate python spark program reads from the Redis cluster and counts the number of rows and displays the results.

from pyspark.sql import SparkSession

# Step 1 - Create a spark session
spark = SparkSession.Builder() \
    .master('local[*]') \
    .config("spark.redis.host", "localhost") \
    .config("spark.redis.port", "6379") \
    .appName("Spark Writer") \
    .getOrCreate()

# Step 2 - Read data from redis-server
base_ds = spark.read \
    .format("org.apache.spark.sql.redis") \
    .option("table", "products") \
    .load()

base_ds.show()

# Step 3 - Aggregate data
print "Number of products:"+str(base_ds.count())

# Step 4 - Stop spark session
spark.stop()

The relevant portion of the output is below

The total products match with the data in our products data file.

This brings us to the end of this blog post on using Spark and Redis. I hope you find this post useful. Till next time…. byeeeeee!

Leave a Comment