Cassandra is a well known open source NoSQL database. It is highly scalable, highly available and performant NoSQL database with no single point of failure. These features make it one of the most widely adopted open source technologies. This post is about how to Spark can leverage Cassandra as a data source for reading and writing to it.
Before we move any further – there are a couple of assumptions to this blog post
- Have a working copy of Cassandra. If not, then please take a look here on this link
- Knowledge of CQL commands. If not, then please take a look here on this link.
Setup for Cassandra
Setting up libraries to access Cassandra is relatively easy and can be done by including the
For this blog post my the build.sbt looked like this

Reading from Cassandra
Once the libraries have been setup, code for reading data is pretty much standard and straight forward. Just need to provide the following
- Cassandra node details
- Keyspace and the table name.
The example below reads the data from a cassandra database.
object SparkCassandra {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val sqlContext: SparkSession = SparkSession.builder()
.master("local[*]")
.config("spark.cassandra.connection.host","127.0.0.1")
.appName("Spark Cassandra")
.getOrCreate
//Step 2 - Read data from Cassandra table
val prodDS = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.option("keyspace","cloudwalker")
.option("table","demo_product")
.load
//Step 3 - Show the data
prodDS.show()
}
}
Let’s analyse the code
- Step 1 – Creates a spark session and also tells it where to find the Cassandra node.
- Step 2 – Read from Cassandra using the format specified and the data is stored in table demo_product, which is in
cloudwalker keyspace. - Step 3 – Show the data read from cassandra.
Below is the relevant log.

Writing to Cassandra
Writing to a data sink in spark is done via standard API. Additional options are passed to the write API
- Keyspace name
- Table name
Below is an example of reading a CSV file locally, summarizing the data and populating the data in a Cassandra table.
Step 1 – Create a table in cassandra
product text,
YEAR text,
total_revenue DECIMAL,
PRIMARY KEY(product, YEAR)
);
Step 2 – Populate the table in Cassandra
import org.apache.spark.sql.functions._
object SparkCassandra {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.config("spark.cassandra.connection.host","127.0.0.1")
.appName("Spark Cassandra")
.getOrCreate
//Step 2 - Read data from a local file.
val baseDS = spark.read
.option("header","true")
.option("delimiter",",")
.csv("products_v2.csv")
baseDS.show
//Step 3 - Aggregate data in Spark
val resultDS = baseDS.groupBy("product","year")
.agg(round(avg("revenue"),2) as "total_revenue")
resultDS.show()
//Step 4 - Write result to Cassandra
resultDS.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.option("keyspace","cloudwalker")
.option("table","prod_sales_yearly")
.save
}
}
Let’s analyse the program
- Step 1 – Creates a spark session
- Step 2 – Reads the data and shows it in the console log.
- Step 3 – Aggregates the data
- Step 4 – Writes the result to Cassandra.
- Observe that we mention the technology that spark needs to write.
- Mode for inserting the data to the table
- Key space & table name.
Below are the relevant parts of the log


Below is the output from the cqlsh the command line tool for Cassandra

Spark Cassandra Connector used in this post is a highly configurable library. For a deeper dive into this connector, I suggest you look at this documentation which is very helpful. Make sure you read about the
That brings us to the end of this blog post. Hope you have found this post useful. Try using both S3 as a source and Cassandra as a sink for processed data.
Till then…Byeeeeeeeeee!
Good job Vipin !