Spark RDDs Performance Tuning – Resource Planning

Introduction

Tuning spark jobs can dramatically increase the performance and help squeeze more from the resources at hand. This post is a high-level overview of how to tune spark jobs and talks about various tools which are at your disposal. Performance tuning may be black magic to some but to most engineers, it would be

  • How many resources are provided
  • How best are the provided resources used
  • How you write your code

Before you can tune a Spark job it is important to identify where are the potential performance bottlenecks.

  • Resources – Memory, CPU Cores and Executors
  • Partitioning & Parallelism
  • Long-running straggling tasks
  • Caching

To help with the above areas Spark provides(and has access to) some tools which can be used for tuning

  • Resource Manager UI(For example – YARN)
  • Spark Web UI & History Server
  • Tungsten & Catalyst Optimizers
  • Explain Plan

For the purpose of this post, a pre-cleaned dataset from Los Angeles Parking Citations available on kaggle website. It is about 2.6 GB which should allow us to run our tests. The test would consist of queries which we execute on the dataset to get some answers. The code is written in Scala and tested to run on AWS EMR cluster.

The resource tuning is done in a few steps and there is no magic 😉

  • Step 1 – Modify /etc/spark/conf/spark-defaults.conf
    • Set spark.dynamicallocation.enabled to false
    • Set spark.executor.instances to 1
    • Set spark.executor.memory 1gb
    • Set spark.executor.cores to 1
  • Step 2 – Inspect the configuration of the cluster
    • Identify resources available to Resource Manager – YARN for the purpose of this blog entry
      • Total CPU cores available
      • Total Memory available
      • Max/Min number of CPU cores available to YARN
      • Max/Min memory available to YARN
      • Identify YARN memory overhead
  • Step 3 – Run the spark application on using minimal configuration settings to get a baseline which can be used to tune the resource.
  • Step 4 – Create resource plans with the following parameters to be passed during runtime.
    • spark.executor.instances
    • spark.executor.memory
    • spark.executor.cores
  • Step 5 – Run your application with various resource plans and record the outcome
  • Step 6 – Select the most optimal mix of resources and run time.

In addition to just tuning the resources for a Spark application, there are other activities which can be done and are discussed in subsequent posts

  • Tuning Caching & JVMs
  • Tuning Spark application
  • Tuning Shuffles and Paritioning

A similar process can be applied to Spark Datasets and Spark SQL. As we go thru this process I will try to compare the same queries using different Spark API.

This is the first of the blogs on Spark performance tuning and focuses just on Spark and YARN resources but subsequent entries talk about how we can build on top of this.

We will start with using the RDDs and move to Spark DataSets and finally using applying Spark SQL. Each of these API has a different performance on the same dataset. Hence, need to be tunned slightly differently from the others. Also just to make things interesting, not all tools are available to all the API.

Modify spark-defaults.conf

Make the changes to the spark-defaults.conf. My config file looks like this

spark.dynamicallocation.enabled false
spark.executor.instances 1
spark.executor.cores 1

These allow a program to run with minimal default resource settings.

Note: spark.yarn.executor.memoryOverhead (at the bottom) – That is the factor/percentage of memory set aside for YARN container. Remaining is used by spark executor/driver.

Cluster Configuration

For this entry purposes, we are using AWS EMR m4.large cluster which has the following hardware configuration. We have 1 master and 4 slave nodes. Each node has 4vCores and 8GB of RAM. All our data is stored on S3.

Summary of hardware resources about the cluster

Property NameValue
Total Number of CPUs20
Total Memory40GB

Resource available to YARN

The YARN configuration is available in various ways. It is available on this path /etc/hadoop/conf/yarn-site.xml

Another place to see YARN configuration via browser is YARN Resource Manager UI

YARN Resource Manager UI->Tools->Configuration

A small summary is also available on the YARN Resource Manager UI landing page

Now let’s look at the resources available to YARN for use

Property Name YARN PropertyValue
Total Number of CPUs16
Total Memory Available24 GB
Minimum Memory for Containeryarn.scheduler.minimum-allocation-mb32MB
Maximum Memory for Container yarn.scheduler.maximum-allocation-vcores 6GB
Minimum CPUs for Containeryarn.scheduler.minimum-allocation-vcores1
Maximum CPUs for Containeryarn.scheduler.maximum-allocation-vcores4

Note – Size of YARN container is 6GB that means that Spark’s executor size will be less than 6GB because YARN container will use some of that memory to manage itself. So if you use 6GB as executor or driver memory chances are you will get an out of memory error and your application will fail.

YARN does not add the memory for the master node to its resources.

Run Spark Application – Minimal Configuration

This will allow us to set a baseline and gives us a start point. We will try and incrementally improve this and see if we can get all this done more efficiently run a spark program. See the program below used for this blog entry.

/*
  Queries for benchmarking
    1. Count the total number of citations
    2. Number of citations by violation code
    3. Number of distinct routes
    4. Amount collected by route
    5. Amount collected by violation code
 */


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SparkRddBenchMark {
  def main(args: Array[String]): Unit = {
    val sparkConf  = new SparkConf()
      .setAppName("SparkRddBenchMark") //Name of our spark app

    val sparkContext = new SparkContext(sparkConf)

    //Set Logger to report only Errors
    //sparkContext.setLogLevel("ERROR")

    //Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
    val linesRdd = sparkContext.textFile(args(0))

    //Step 2 - Transformation - Split each line into different attributes using - map method
    val dataRDD = linesRdd.filter(x => !x.isEmpty)
      .map(_.split(","))

    //Step 3 - Query - 1 - Count total number of citations[Spark Action]
    println("Query 1 - Count of Rows using complete rows :"+dataRDD.count())

    //Step 4 - Query - 2 -  Number of citations by violation code
    //Extract violation code and form key,value pair rdd.
    val q2BaseRDD = dataRDD
      .map(x=>(x(14),1))
      .groupByKey.map(p=>(p._1,p._2.size))

    println("Query 2 - Number of citations by violation code:")
    q2BaseRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Citation Count: ${t._2}"))


    //Step 5 - Query - 3 - Number of distinct routes
    val q3BaseRDD = dataRDD.map(_(12))
    println("Query 3 - Number of distinct routes:"+q3BaseRDD.distinct.count)

    //Step 6 - Query - 4 - Amount collected by route
    //Remove any rows where amount is blank
    val q4BaseRDD = dataRDD.filter(_(16)!="")

    val q4ResultRDD = q4BaseRDD
      .map(x=>(x(12), try {x(16).toInt} catch{case e: Exception => 0} ))
      .groupByKey.map(p=>(p._1,p._2.sum))

    //Spark Action
    println("Query 4 - Amount collected by route:")
    q4ResultRDD.collect.foreach(t=>println(s"Route Code ${t._1}, Amount Collected: ${t._2}"))

    //Step 5 - Query 5 - Amount collected by violation code
    //Use Query 4 base RDD, Remove any rows where the amount is blank

    val q5ResultRDD = q4BaseRDD
      .map(x=>(x(14),x(16).toFloat))
      .groupByKey.map(p=>(p._1,p._2.sum))

    println("Query 5 - Amount collected by violation code:")
    q5ResultRDD.collect.foreach(t=>println(s"Violation Code ${t._1}, Amount Collected: ${t._2}"))
  }
}

It can be executed using the following command

spark-submit --class SparkRddBenchMark --executor-cores 1  --executor-memory 1g sparkrddbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

When this is executed on the cluster it creates 2 executors and completes in 4 minutes! Observe groupByKey method is used. See Below

Executor tab shows two executors were launched.

Note: Observe the amount of shuffling being reported due to the use of groupByKey

Create Resource plans for execution

Below are some of the resource plans I came up with to test our application. We can add additional parameters like parallelism as well to make them more accurate but for this blog post, this should suffice. Parallelism is discussed in more detail in a subsequent blog entry.

Use resource plans for execution

Tuning of clusters is achieved by providing the right type & amount of resources to the Spark Application. Providing more memory and/or more CPUs and/or executors may not always solve the issue but may exacerbate the issue you are trying to solve.

Brute force does not always work

Let’s take our resource plans one by one and execute them. At end there is a

Resource Plan 1 – Min resources

Property NameValue
Number of Coresexecutor-cores1 CPU
Memoryexecutor-memory1 GB
Number of executorsnum-executors2

Check out the execution results in step 3 above.

Resource Plan 2 – Max number of executors. Min executor memory allocation.

Property NameValue
Number of Coresexecutor-cores1 CPU
Memoryexecutor-memory1 GB
Number of executorsnum-executors15
spark-submit --class SparkRddBenchMark --executor-cores 1  --executor-memory 1g --num-executors 15 sparkrddbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

The executor tab shows 15 executors launched. It takes 2.7mins as compared to 4.0mins!

Resource Plan 3 – One executor per node. Min executor memory allocation

Property NameValue
Number of Coresexecutor-cores1 CPU
Memoryexecutor-memory1 GB
Number of executorsnum-executors4
spark-submit --class SparkRddBenchMark --executor-cores 1  --executor-memory 1g --num-executors 4 sparkrddbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

It takes 2.3mins as compared to 2.7mins!

The executor tab shows 4 executors launched.

Resource Plan 4 – One executor per node. 2CPUs per executor

Property NameValue
Number of Coresexecutor-cores2 CPU
Memoryexecutor-memory1 GB
Number of executorsnum-executors4
spark-submit --class SparkRddBenchMark --executor-cores 2  --executor-memory 1g --num-executors 4 sparkrddbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

It takes 2.2mins!

The executor tab shows 4 executors launched.

Resource Plan 5 – One executor per node. 2CPUs & 2GB per executor

Property NameValue
Number of Coresexecutor-cores2 CPU
Memoryexecutor-memory2 GB
Number of executorsnum-executors4
spark-submit --class SparkRddBenchMark --executor-cores 2  --executor-memory 2g --num-executors 4 sparkrddbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

It takes 2.1 mins! As compared to 4 mins with minimal configuration. So almost reduced the runtime by half!

The executor tab shows 4 executors launched!

Select a resource plan

Below is the summary of spark application execution with various resource plans.

Summary

  • A lot of executors and throwing resources to a spark application may not lead to faster execution.
  • Increasing memory may not give as much an efficiency increase beyond a certain point. Partitioning does play a part but more on that later.

This brings us to the end of the first entry on Spark RDDs Performance Tuning. In the next entry, we will look at how we can use better partitioning to better predict memory requirements for executors.

3 thoughts on “Spark RDDs Performance Tuning – Resource Planning”

Leave a Comment