Spark RDDs Performance Tuning – Partitioning

Now that we have gone thru resource planning, shuffling & caching to tune our spark application we have still not looked at couple of areas which can give us some additional performance gains and make our application a bit more efficient.

One of the areas we can look at tuning is how Spark partitions its data into partitions and processes them. If we have too small partitions we would have a scenario were Spark will take more time to start than it will take time to process data and of course lead to additional shuffling. So the granular approach may not be the answer. In the same way, it may also be the case where having big partitions may lead to inefficient use of cluster resources. The answer lies somewhere in between these two extremes.

If you have run the code in any of the previous posts you will have observed that there are 39 partitions & tasks being performed by the spark cluster for our spark application.

Let’s first see the code which will help us find the number of initial partitions and see how we can repartition/coalesce data to a different number of partitions to achieve an improvement.

/*
  Queries for benchmarking
    1. Count the total number of citations
    2. Number of citations by violation code
    3. Number of distinct routes
    4. Amount collected by route
    5. Amount collected by violation code
*/


import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object SparkTuningPartitions  {
  def main(args: Array[String]): Unit = {
    val sparkConf  = new SparkConf()
      .setAppName("SparkTuningPartitions") //Name of our spark app
     // .setMaster("local[*]")

    val sparkContext = new SparkContext(sparkConf)


    //Set Logger to report only Errors
    //sparkContext.setLogLevel("DEBUG")

    //Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
    val linesRdd = sparkContext.textFile(args(0))

    println(s"num of partitions linesRdd ${linesRdd.getNumPartitions}")
    //Step 2 - Transformation - Split each line into different attributes using - map method
    val dataRDD = linesRdd.filter(x => !x.isEmpty)
      .map(_.split(","))
      .map(p=>(p(12),p(14),p(16))) //Reduce the data set to be shuffled
      .coalesce(args(1).toInt)
      .cache //cache the data in the memory for re-use

    println(s"num of partitions dataRdd ${dataRDD.getNumPartitions}")

    //Step 3 - Query - 1 - Count total number of citations[Spark Action]
    println("Query 1 - Count of Rows using complete rows :"+dataRDD.count())

    //Step 4 - Query - 2 -  Number of citations by violation code
    //Extract violation code and form key,value pair rdd.
    val q2BaseRDD = dataRDD
      .map(x=>(x._2,1))
      .reduceByKey((x,y)=>x+y)

    println("Query 2 - Number of citations by violation code:")
    q2BaseRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Citation Count: ${t._2}"))


    //Step 5 - Query - 3 - Number of distinct routes
    val q3BaseRDD = dataRDD.map(_._1)
    println("Query 3 - Number of distinct routes:"+q3BaseRDD.distinct.count)


    //Step 6 - Query - 4 - Amount collected by route
    //Remove any rows where amount & route is blank
    val q4BaseRDD = dataRDD.filter(_._1!="").filter(_._3!="")

    val q4ResultRDD = q4BaseRDD
      .map(x=>(x._1, try {x._3.toInt} catch{case e: Exception => 0} ))
      .reduceByKey((x,y)=>x+y)


    //Spark Action
    println("Query 4 - Amount collected by route:")
    q4ResultRDD.collect.foreach(t=>println(s"Route Code ${t._1}, Amount Collected: ${t._2}"))

    //Step 5 - Query 5 - Amount collected by violation code
    //Use Query 4 base RDD, Remove any rows where the amount is blank

    val q5ResultRDD = q4BaseRDD
      .map(x=>(x._2,x._3.toFloat))
      .reduceByKey((x,y)=>x+y)

    println("Query 5 - Amount collected by violation code:")
    q5ResultRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Amount Collected: ${t._2}"))

  }
}

Observe that we have not used the partitionBy method- because it is only available to pair RDDs and the application above does not have a pair RDD. Lots of confusion around it and good to know the differentiation. So we have to think of something else.

We know from logs in our previous posts that there are 39 partitions being generated. Hence the use of the coalesce method instead of the repartition method. We want to reduce the number of partitions we are operating upon. Currently, they are too many and too small.

In the spark-submit command below we are asking spark to create only 4 partitions and run the code. So the other end of the extreme where we have really big partitions.

spark-submit --class SparkTuningPartitions --executor-cores 2  --executor-memory 2g --num-executors 4 sparktuningpartitions.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv 4

The result is shown below in the logs of the history server below. It has taken more time than our previous post.

Observe that we are still being hit by the JVM issue. More on that a bit lower down in the post(it saves time as well!)

Let’s do a spark submit again but this time we will have 8 partitions instead of 4. See Below

spark-submit --class SparkTuningPartitions --executor-cores 2  --executor-memory 2g --num-executors 4 sparktuningpartitions.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv 8

The result is shown below in the logs of the history server below a slight improvement on the earlier result and nothing much to talk about.

Finally, we come to our nagging issue of garbage collection. If you want to learn more about it – there is an amazing post by databricks which really says it all. Its an old post but it is really a good place to start and get your head around garbage collection in Spark. Most importantly, it is an easy read for the subject it is trying to address. Additionally, here is an excellent post on tuning

Before going forward let’s see for our previous run what was the garbage collector used. It can be obtained from the environments tab.

The garbage collector is Concurrent Mark Sweep (CMS). Let’s change the garbage collector to G1 which is more efficient. See below the spark submit command. It has a few more parameters to pass but there are no changes to the application code.

spark-submit --class SparkTuningPartitions --executor-cores 2  --executor-memory 2g --num-executors 4 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy" sparktuningpartitions.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv 8

The results are quite interesting as it knocks of few seconds off from our runtime.

Here is a quick summary

  1. We did see that the shuffling went down from 1.2Mb to 365Kb when we reduced the number of partitions. Because more of the data was localised to nodes.
  2. We knocked off an additional 10% time from our last run time we are now running the same application in 54 secs.
  3. We also see that our garbage collection time has come down by half.
  4. We have now brought down our runtime from 4mins to 54secs.

Leave a Reply

Your email address will not be published. Required fields are marked *