Spark & Cassandra

Cassandra is a well known open source NoSQL database. It is highly scalable, highly available and performant NoSQL database with no single point of failure. These features make it one of the most widely adopted open source technologies. This post is about how to Spark can leverage Cassandra as a data source for reading and writing to it.

Before we move any further – there are a couple of assumptions to this blog post

  • Have a working copy of Cassandra. If not, then please take a look here on this link
  • Knowledge of CQL commands. If not, then please take a look here on this link.

Setup for Cassandra

Setting up libraries to access Cassandra is relatively easy and can be done by including the datastax libraries in your project. If you are using IntelliJ, SBT – it is simple as adding a line in build.sbt below

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0"

For this blog post my the build.sbt looked like this

build.sbt

Reading from Cassandra

Once the libraries have been setup, code for reading data is pretty much standard and straight forward. Just need to provide the following

  • Cassandra node details
  • Keyspace and the table name.

The example below reads the data from a cassandra database.

import org.apache.spark.sql.SparkSession

object SparkCassandra {
  def main(args: Array[String]): Unit = {

    //Step 1 - Create a spark session
    val sqlContext: SparkSession = SparkSession.builder()
      .master("local[*]")
      .config("spark.cassandra.connection.host","127.0.0.1")
      .appName("Spark Cassandra")
      .getOrCreate

    //Step 2 - Read data from Cassandra table
    val prodDS = sqlContext
      .read
      .format("org.apache.spark.sql.cassandra")
      .option("keyspace","cloudwalker")
      .option("table","demo_product")
      .load

    //Step 3 - Show the data
    prodDS.show()
  }
}

Let’s analyse the code

  • Step 1 – Creates a spark session and also tells it where to find the Cassandra node.
  • Step 2 – Read from Cassandra using the format specified and the data is stored in table demo_product, which is in cloudwalker keyspace.    
  • Step 3 – Show the data read from cassandra.

Below is the relevant log.

Cassandra Data in Spark

Writing to Cassandra

Writing to a data sink in spark is done via standard API. Additional options are passed to the write API

  • Keyspace name
  • Table name

Below is an example of reading a CSV file locally, summarizing the data and populating the data in a Cassandra table.

Step 1 – Create a table in cassandra

CREATE TABLE cloudwalker.prod_sales_yearly(
    product text,
    YEAR text,
    total_revenue DECIMAL,
    PRIMARY KEY(product, YEAR)
);

Step 2 – Populate the table in Cassandra

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkCassandra {
  def main(args: Array[String]): Unit = {

    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .config("spark.cassandra.connection.host","127.0.0.1")
      .appName("Spark Cassandra")
      .getOrCreate

    //Step 2 - Read data from a local file.
    val baseDS = spark.read
      .option("header","true")
      .option("delimiter",",")
      .csv("products_v2.csv")

    baseDS.show

    //Step 3 - Aggregate data in Spark
    val resultDS = baseDS.groupBy("product","year")
      .agg(round(avg("revenue"),2) as "total_revenue")

    resultDS.show()

    //Step 4 - Write result to Cassandra
      resultDS.write
      .format("org.apache.spark.sql.cassandra")
      .mode("append")
      .option("keyspace","cloudwalker")
      .option("table","prod_sales_yearly")
      .save
  }
}

Let’s analyse the program

  • Step 1 – Creates a spark session
  • Step 2 – Reads the data and shows it in the console log.
  • Step 3 – Aggregates the data
  • Step 4 – Writes the result to Cassandra.
    • Observe that we mention the technology that spark needs to write.
    • Mode for inserting the data to the table
    • Key space & table name.

Below are the relevant parts of the log

Source data in Spark
Aggregated data in Spark

Below is the output from the cqlsh the command line tool for Cassandra

Output in cqlsh

Spark Cassandra Connector used in this post is a highly configurable library. For a deeper dive into this connector, I suggest you look at this documentation which is very helpful. Make sure you read about the read and write consistency levels!

That brings us to the end of this blog post. Hope you have found this post useful. Try using both S3 as a source and Cassandra as a sink for processed data.

Till then…Byeeeeeeeeee!

2 thoughts on “Spark & Cassandra”

Leave a Comment