Spark & RDBMS

Spark interacts with various traditional open source and proprietary RDBMS via JDBC connectivity. If there is a JDBC driver available for the RDBMS then it can be used as a source or a sink.

In this blog post, we look at how Spark integrates with the open-source Postgres database. The process would be the same or nearly similar for the other RDBMS as well.

Setup

JDBC Driver Libraries

Spark needs to access Postgres database via JDBC driver. To enable this requirement a Postgres driver needs to be added to the development environment. To do this, update the build.sbt  to include the Postgres driver libraries in addition to the spark libraries. See Below

libraryDependencies += "postgresql" % "postgresql" % "9.1-901-1.jdbc4"

The build.sbt for this blog entry should look like this.

build.sbt

Database Setup

To read from the database we have created the two tables in the public schema of a postgres database.

DROP TABLE IF EXISTS emp;

DROP TABLE IF EXISTS dept;

CREATE TABLE emp(
    emp_id  BIGINT,
    emp_name    VARCHAR(30),
    age INT,
    dept_no INT,
    salary  BIGINT
);

CREATE TABLE dept(
    dept_no INT,
    dept_name   VARCHAR(30)
);

INSERT INTO dept(dept_no,dept_name) VALUES(1,'sales');
INSERT INTO dept(dept_no,dept_name) VALUES(2,'marketing');
INSERT INTO dept(dept_no,dept_name) VALUES(3,'account');

INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(1,'tom',20,1,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(2,'bob',30,2,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(3,'mike',20,1,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(4,'peter',20,1,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(5,'john',30,3,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(6,'ray',20,2,3000);

This completes the setup for this blog entry.

Reading from RDBMS

Reading from an RDBMS is no different from reading from NoSQL Cassandra. Only variations are in the options which are passed to the Spark’s Read API. Let’s look at the example. See Below

import org.apache.spark.sql.SparkSession

object SparkRDBMS {
  def main(args: Array[String]): Unit = {
    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark RDBMS")
      .getOrCreate

    //Step 2 - Read data from postgres.
    val baseDS = spark.read
      .format("jdbc")
      .option("url","jdbc:postgresql://localhost:5432/vipin")
      .option("username","vipin")
      .option("password","welcome")
      .option("dbtable","emp")
      .load

    //Step 3 - Show table data.
    baseDS.show
  }
}

Let’s analyse the code above

  • Step 1 – Create a spark session
  • Step 2 – Read a table from the database.
    • The format is set to JDBC – this tells spark that it will read over a JDBC connection
    • The URL to be used to create the JDBC connection using the driver included
    • Username/Password to use to connect to the database
    • dbtable option to tell spark which table to read from.
  • Step 3 – Simple shows the data.

Keep in mind that these are all lazy in nature. We can now apply filtering or any aggregation we want using Spark APIs.

Spark provides various parameters for efficient reads. You can find this documented nicely on this link

  • Spark can parallelise reads for efficient reading from a database if predicates are provided
  • Instead of reading a full table it is possible to write a select query
  • Partition Column with upper and lower bound can be used parallelise the reads
  • Number of Partitions(numParitions) can also be specified to increase parallelism during reads.

Writing to RDBMS

Writing to an RDBMS is slightly more involved as various options need to be set before writing to RDBMS. Keep in mind that there is no native spark support for Updates or Upserts . Custom logic would have to be written to enable upserts.

Let’s look at the example below and then analyse what each of the steps does. Observe that most of the code is similar to the one written for Cassandra in the earlier blog post

import java.util.Properties
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.{avg, round}

object SparkRDBMS {
  def main(args: Array[String]): Unit = {
    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark RDBMS")
      .getOrCreate

    //Step 2 - Read data from a local file.
    val baseDS = spark.read
      .option("header","true")
      .option("delimiter",",")
      .csv("products_v2.csv")
    baseDS.show

    //Step 3 - Aggregate data in Spark
    val resultDS = baseDS.groupBy("product","year")
      .agg(round(avg("revenue"),2) as "total_revenue")
    resultDS.show

    //Step 4 - Setup parameters to connect to Postgres DB
    val url   = "jdbc:postgresql://localhost:5432/vipin"
    val table = "myresult"
    val connectionProperties = new Properties
    connectionProperties.setProperty("user","vipin")
    connectionProperties.setProperty("password","welcome")
    connectionProperties.setProperty("driver","org.postgresql.Driver")

    //Step 5 - Write data to postgres table
    resultDS.write
      .mode(SaveMode.Overwrite)
      .jdbc(url,table,connectionProperties)
  }
}

Let’s analyse the steps below

  • Step 1 – Create a spark session
  • Step 2 – Read data from a local file.
  • Step 3 – Aggregate data in Spark
  • Step 4 – Setup parameters to connect to Postgres DB
    • URL – DB Server, Port details
    • Table – DB table to write to
    • Connection Properties – Like username, password and Driver Information.
  • Step 5 – Write to Postgres table
    • Save Mode – How the data is saved to the table. There are various Save Modes like Append, OverWrite etc. These are documented on this link look for mode method
    • The table is automatically created for us in this case. But you can create it before hand if you want.

Below are the relevant parts of the logs & output from SQL client

products_v2.csv

Aggregated data in dataset

resultDS

Finally the data is written to postgres

Output in SQL Client

This bring us to the end of how Spark can be read and write to RDBMS. Hope you have found this post helpful.

Leave a Comment