Spark interacts with various traditional open source and proprietary RDBMS via JDBC connectivity. If there is a JDBC driver available for the RDBMS then it can be used as a source or a sink.
In this blog post, we look at how Spark integrates with the open-source Postgres database. The process would be the same or nearly similar for the other RDBMS as well.
Setup
JDBC Driver Libraries
Spark needs to access Postgres database via JDBC driver. To enable this requirement a Postgres driver needs to be added to the development environment. To do this, update the build.sbt to include the Postgres driver libraries in addition to the spark libraries. See Below
The build.sbt for this blog entry should look like this.

Database Setup
To read from the database we have created the two tables in the public schema of a postgres database.
DROP TABLE IF EXISTS dept;
CREATE TABLE emp(
emp_id BIGINT,
emp_name VARCHAR(30),
age INT,
dept_no INT,
salary BIGINT
);
CREATE TABLE dept(
dept_no INT,
dept_name VARCHAR(30)
);
INSERT INTO dept(dept_no,dept_name) VALUES(1,'sales');
INSERT INTO dept(dept_no,dept_name) VALUES(2,'marketing');
INSERT INTO dept(dept_no,dept_name) VALUES(3,'account');
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(1,'tom',20,1,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(2,'bob',30,2,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(3,'mike',20,1,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(4,'peter',20,1,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(5,'john',30,3,3000);
INSERT INTO emp(emp_id,emp_name,age,dept_no,salary) VALUES(6,'ray',20,2,3000);
This completes the setup for this blog entry.
Reading from RDBMS
Reading from an RDBMS is no different from reading from NoSQL Cassandra. Only variations are in the options which are passed to the Spark’s Read API. Let’s look at the example. See Below
object SparkRDBMS {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark RDBMS")
.getOrCreate
//Step 2 - Read data from postgres.
val baseDS = spark.read
.format("jdbc")
.option("url","jdbc:postgresql://localhost:5432/vipin")
.option("username","vipin")
.option("password","welcome")
.option("dbtable","emp")
.load
//Step 3 - Show table data.
baseDS.show
}
}
Let’s analyse the code above
- Step 1 – Create a spark session
- Step 2 – Read a table from the database.
- The format is set to JDBC – this tells spark that it will read over a JDBC connection
- The URL to be used to create the JDBC connection using the driver included
- Username/Password to use to connect to the database
dbtable option to tell spark whichtable to read from.
- Step 3 – Simple shows the data.
Keep in mind that these are all lazy in nature. We can now apply filtering or any aggregation we want using Spark APIs.
Spark provides various parameters for efficient reads. You can find this documented nicely on this link
- Spark can parallelise reads for efficient reading from a database if predicates are provided
- Instead of reading a full table it is possible to write a select query
- Partition Column with upper and lower bound can be used parallelise the reads
- Number of Partitions(numParitions) can also be specified to increase parallelism during reads.
Writing to RDBMS
Writing to an RDBMS is slightly more involved as various options need to be set before writing to RDBMS. Keep in mind that there is no native spark support for Updates or Upserts . Custom logic would have to be written to enable upserts.
Let’s look at the example below and then analyse what each of the steps does. Observe that most of the code is similar to the one written for Cassandra in the earlier blog post
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.{avg, round}
object SparkRDBMS {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark RDBMS")
.getOrCreate
//Step 2 - Read data from a local file.
val baseDS = spark.read
.option("header","true")
.option("delimiter",",")
.csv("products_v2.csv")
baseDS.show
//Step 3 - Aggregate data in Spark
val resultDS = baseDS.groupBy("product","year")
.agg(round(avg("revenue"),2) as "total_revenue")
resultDS.show
//Step 4 - Setup parameters to connect to Postgres DB
val url = "jdbc:postgresql://localhost:5432/vipin"
val table = "myresult"
val connectionProperties = new Properties
connectionProperties.setProperty("user","vipin")
connectionProperties.setProperty("password","welcome")
connectionProperties.setProperty("driver","org.postgresql.Driver")
//Step 5 - Write data to postgres table
resultDS.write
.mode(SaveMode.Overwrite)
.jdbc(url,table,connectionProperties)
}
}
Let’s analyse the steps below
- Step 1 – Create a spark session
- Step 2 – Read data from a local file.
- Step 3 – Aggregate data in Spark
- Step 4 – Setup parameters to connect to Postgres DB
- URL – DB Server, Port details
- Table – DB table to write to
- Connection Properties – Like username, password and Driver Information.
- Step 5 – Write to Postgres table
- Save Mode – How the data is saved to the table. There are various Save Modes like Append, OverWrite etc. These are documented on this link look for mode method
- The table is automatically created for us in this case. But you can create it before hand if you want.
Below are the relevant parts of the logs & output from SQL client

Aggregated data in dataset

Finally the data is written to postgres

This bring us to the end of how Spark can be read and write to RDBMS. Hope you have found this post helpful.