Let’s look at the next step in our performance tuning journey. This one involves changing which APIs are called and how data is stored in the distributed memory. Both involve code changes – not too many but they do help. Both these topics have been covered to an extent in some of my previous blog posts.
For a quick recap – refer the links below
We will use the same dataset and the same transformations (as in the previous post) but with minor changes which are highlighted and see how we can reduce shuffling and bring down the runtimes further.
Reducing Shuffling
There are two simple ways of reducing shuffling.
- Reduce the dataset on which the shuffle occurs.
- Change the API to use a more efficient API.
Reduce dataset size
When doing data analytics it is usually observed that not all the attributes which make up the data are required for analysis being done by the application. Which means that a lot of memory may be occupied by data which is not required and when shuffling occurs it leads to data which would never be used being moved around. Hence it makes sense to remove those data attributes/fields from the data set to make your program more efficient because it will process less data leading to increased throughput.
In our example, we are trying to analyse data and if we observe only three attributes are used to come-up with the answers to our queries. Hence it would make sense to remove the remaining data. In our case we go down from 19 columns to just 3 columns.
See the code below
Queries for benchmarking
1. Count the total number of citations
2. Number of citations by violation code
3. Number of distinct routes
4. Amount collected by route
5. Amount collected by violation code
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object SparkRddReduceShuffle {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("SparkRddReduceShuffle1") //Name of our spark app
val sparkContext = new SparkContext(sparkConf)
//Set Logger to report only Errors
//sparkContext.setLogLevel("ERROR")
//Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
val linesRdd = sparkContext.textFile(args(0))
//Step 2 - Transformation - Split each line into different attributes using - map method
val dataRDD = linesRdd.filter(x => !x.isEmpty)
.map(_.split(","))
.map(p=>(p(12),p(14),p(16))) //Reduce the data set to be shuffled
//Step 3 - Query - 1 - Count total number of citations[Spark Action]
println("Query 1 - Count of Rows using complete rows :"+dataRDD.count())
//Step 4 - Query - 2 - Number of citations by violation code
//Extract violation code and form key,value pair rdd.
val q2BaseRDD = dataRDD
.map(x=>(x._2,1))
.groupByKey.map(p=>(p._1,p._2.size))
println("Query 2 - Number of citations by violation code:")
q2BaseRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Citation Count: ${t._2}"))
//Step 5 - Query - 3 - Number of distinct routes
val q3BaseRDD = dataRDD.map(_._1)
println("Query 3 - Number of distinct routes:"+q3BaseRDD.distinct.count)
//Step 6 - Query - 4 - Amount collected by route
//Remove any rows where amount & route is blank
val q4BaseRDD = dataRDD.filter(_._1!="").filter(_._3!="")
val q4ResultRDD = q4BaseRDD
.map(x=>(x._1, try {x._3.toInt} catch{case e: Exception => 0} ))
.groupByKey.map(p=>(p._1,p._2.sum))
//Spark Action
println("Query 4 - Amount collected by route:")
q4ResultRDD.collect.foreach(t=>println(s"Route Code ${t._1}, Amount Collected: ${t._2}"))
//Step 5 - Query 5 - Amount collected by violation code
//Use Query 4 base RDD, Remove any rows where the amount is blank
val q5ResultRDD = q4BaseRDD
.map(x=>(x._2,x._3.toFloat))
.groupByKey.map(p=>(p._1,p._2.sum))
println("Query 5 - Amount collected by violation code:")
q5ResultRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Amount Collected: ${t._2}"))
}
}
Spark Submit command used to run the code above
See below the output from Spark History server. Observe we have reduced the amount spark took to process the entire data set down to 1.5mins! down from 2.0mins! by reducing the size of the dataset required to process. See Below

Observe how the amount of shuffling has gone down from 36MB to 20.4MB. It is still high but is an improvement over the previous result. See below

Use more efficient API
As we already know that groupByKey is not really an efficient way of aggregating data and reduceByKey offers a more efficient way of aggregating data and helps in reducing the shuffles involved. Let’s replace groupByKey with reduceByKey in our code. See Below
Queries for benchmarking
1. Count the total number of citations
2. Number of citations by violation code
3. Number of distinct routes
4. Amount collected by route
5. Amount collected by violation code
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object SparkRddReduceShuffle {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("SparkRddReduceShuffle2") //Name of our spark app
//.setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf)
//Set Logger to report only Errors
//sparkContext.setLogLevel("ERROR")
//Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
val linesRdd = sparkContext.textFile(args(0))
//Step 2 - Transformation - Split each line into different attributes using - map method
val dataRDD = linesRdd.filter(x => !x.isEmpty)
.map(_.split(","))
.map(p=>(p(12),p(14),p(16))) //Reduce the data set to be shuffled
//Step 3 - Query - 1 - Count total number of citations[Spark Action]
println("Query 1 - Count of Rows using complete rows :"+dataRDD.count())
//Step 4 - Query - 2 - Number of citations by violation code
//Extract violation code and form key,value pair rdd.
val q2BaseRDD = dataRDD
.map(x=>(x._2,1))
.reduceByKey((x,y)=>x+y)
println("Query 2 - Number of citations by violation code:")
q2BaseRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Citation Count: ${t._2}"))
//Step 5 - Query - 3 - Number of distinct routes
val q3BaseRDD = dataRDD.map(_._1)
println("Query 3 - Number of distinct routes:"+q3BaseRDD.distinct.count)
//Step 6 - Query - 4 - Amount collected by route
//Remove any rows where amount & route is blank
val q4BaseRDD = dataRDD.filter(_._1!="").filter(_._3!="")
val q4ResultRDD = q4BaseRDD
.map(x=>(x._1, try {x._3.toInt} catch{case e: Exception => 0} ))
.reduceByKey((x,y)=>x+y)
//Spark Action
println("Query 4 - Amount collected by route:")
q4ResultRDD.collect.foreach(t=>println(s"Route Code ${t._1}, Amount Collected: ${t._2}"))
//Step 5 - Query 5 - Amount collected by violation code
//Use Query 4 base RDD, Remove any rows where the amount is blank
val q5ResultRDD = q4BaseRDD
.map(x=>(x._2,x._3.toFloat))
.reduceByKey((x,y)=>x+y)
println("Query 5 - Amount collected by violation code:")
q5ResultRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Amount Collected: ${t._2}"))
}
}
Spark submit command used is below. Observe how we have used the same amount of cluster resources
Spark History server landing page shows how the application finished in 1.3 mins!! which is less than the time(1.5mins) taken in the previous step.

Executor screen has even more interesting details. We see that the shuffles have gone down to 1.2MB down from 20.4MB – a reduction of almost 90%. Which means the application is performing a lot more efficiently with lot less resources.

Caching
Till now we have tuned our cluster made efficient use of our cluster. But one of the things to observe is that we have not removed one inefficiency and that is we are reading the same data again and again for all our queries. Let’s add caching to the mix of things and see how it makes the application even more efficient.
The code introduces just a single cache API onto dataRdd and no other change is made. See Below
Queries for benchmarking
1. Count the total number of citations
2. Number of citations by violation code
3. Number of distinct routes
4. Amount collected by route
5. Amount collected by violation code
*/
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object SparkRddReduceShuffleWithCache {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("SparkRddReduceShuffle2") //Name of our spark app
//.setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf)
//Set Logger to report only Errors
//sparkContext.setLogLevel("ERROR")
//Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
val linesRdd = sparkContext.textFile(args(0))
//Step 2 - Transformation - Split each line into different attributes using - map method
val dataRDD = linesRdd.filter(x => !x.isEmpty)
.map(_.split(","))
.map(p=>(p(12),p(14),p(16))) //Reduce the data set to be shuffled
.cache //cache the data in the memory for re-use
//Step 3 - Query - 1 - Count total number of citations[Spark Action]
println("Query 1 - Count of Rows using complete rows :"+dataRDD.count())
//Step 4 - Query - 2 - Number of citations by violation code
//Extract violation code and form key,value pair rdd.
val q2BaseRDD = dataRDD
.map(x=>(x._2,1))
.reduceByKey((x,y)=>x+y)
println("Query 2 - Number of citations by violation code:")
q2BaseRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Citation Count: ${t._2}"))
//Step 5 - Query - 3 - Number of distinct routes
val q3BaseRDD = dataRDD.map(_._1)
println("Query 3 - Number of distinct routes:"+q3BaseRDD.distinct.count)
//Step 6 - Query - 4 - Amount collected by route
//Remove any rows where amount & route is blank
val q4BaseRDD = dataRDD.filter(_._1!="").filter(_._3!="")
val q4ResultRDD = q4BaseRDD
.map(x=>(x._1, try {x._3.toInt} catch{case e: Exception => 0} ))
.reduceByKey((x,y)=>x+y)
//Spark Action
println("Query 4 - Amount collected by route:")
q4ResultRDD.collect.foreach(t=>println(s"Route Code ${t._1}, Amount Collected: ${t._2}"))
//Step 5 - Query 5 - Amount collected by violation code
//Use Query 4 base RDD, Remove any rows where the amount is blank
val q5ResultRDD = q4BaseRDD
.map(x=>(x._2,x._3.toFloat))
.reduceByKey((x,y)=>x+y)
println("Query 5 - Amount collected by violation code:")
q5ResultRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Amount Collected: ${t._2}"))
}
}
The spark submit command is below
See the spark history server and we see that our application has now run in just 1 min! as compared to 4 mins! when we started

The shuffling is still 1.2MB and has not changed. Observe how the Garbage Collection time has gone up slightly. This is addressed in one of the posts later.

Summary
The summary of the results is shown below. No magic! – just simple changes to the Spark application and how it is submitted.

The next posts will talk about partitioning, task sizing, data sets and how they can be tuned and how to avoid out of memory errors by sizing memory more accurately.
1 thought on “Spark RDDs Performance Tuning – Shuffling & Caching”