Spark DataSets Performance Tuning – Resource Planning

Introduction

In this post we will focus on datasets and see how they can be tuned and also see if they are more efficient than working on RDDs even though datasets are converted to RDDs for processing. More on that later.

To set a stage for this post we will use the same resource plans as were used in the Spark RDDs Performance Tuning – Resource Planning and would also answer the same queries, using the same data and using a cluster with exactly the same components & resources. That way we will have a nice comparative as well. If you haven’t read it – then you should :).

So let’s dive in and run our tests and see how everything stacks up.

Step – 1 Modify spark-defaults.conf

Make the following changes to the spark-defaults.conf. In Amazon EMR it is found on /etc/spark/conf/ directory

spark.dynamicallocation.enabled false
spark.executor.instances 1
spark.executor.cores 1

Step – 2 Cluster Configuration

For this entry purposes, we are using AWS EMR m4.large cluster which has the following hardware configuration. We have 1 master and 4 slave nodes. Each node has 4vCores and 8GB of RAM. All our data is stored on S3.

Summary of hardware resources about the cluster

Property NameValue
Total Number of CPUs20
Total Memory40GB

Step – 3 Run Spark Application – Minimal Configuration

This will allow us to set a baseline and gives us a start point. We will try and incrementally improve this and see if we can get all this done more efficiently run a spark program. See the program below used for this blog entry.

/*
  Queries for benchmarking
    1. Count the total number of citations
    2. Number of citations by violation code
    3. Number of distinct routes
    4. Amount collected by route
    5. Amount collected by violation code
 */


import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

object SparkDataSetBenchMark {

  def main(args: Array[String]): Unit = {
    val sparkConf  = new SparkConf()
      .setAppName("SparkRddBenchMark") //Name of our spark app

    val spark = SparkSession.builder().config(sparkConf)
      //.master("local[*]")
      .appName("SparkDataSetBenchMark")
      .getOrCreate

    import spark.implicits._

    //Step 1 - Transformation - Read a text file using the Spark Context and create a dataset.

    val citationsSchema = StructType(
      List(
        StructField("ticket_number", StringType, true),
        StructField("issue_date", StringType, true),
        StructField("issue_time", StringType, true),
        StructField("meter_id", StringType, true),
        StructField("marked_time", StringType, true),
        StructField("rp_state_plate", StringType, true),
        StructField("plate_expiry_date", StringType, true),
        StructField("vin", StringType, true),
        StructField("make", StringType, true),
        StructField("body_style", StringType, true),
        StructField("color", StringType, true),
        StructField("location", StringType, true),
        StructField("route", StringType, true),
        StructField("agency", StringType, true),
        StructField("violation_code", StringType, true),
        StructField("violation_description", StringType, true),
        StructField("fine_amount", IntegerType, true),
        StructField("latitude", StringType, true),
        StructField("longitude", StringType, true)
      )
    )
     val baseDS = spark.read
      .option("header","true")
      .option("delimiter",",")
      .schema(citationsSchema)
      .csv(args(0))

    //Step 2 - Query - 1 - Count total number of citations[Spark Action]
    println("Query 1 - Count of Rows using complete rows :"+baseDS.count)

    //Step 3 - Query - 2 -  Number of citations by violation code
    val query2DS = baseDS.groupBy($"violation_code").count()
    println("Query 2 - Number of citations by violation code:")
    query2DS.show()

    //Step 4 - Query - 3 - Number of distinct routes
    println("Query 3 - Number of distinct routes:"+baseDS.select($"route").distinct.count)

    //Step 6 - Query - 4 - Amount collected by route
    val query4DS = baseDS.groupBy($"route").sum("fine_amount")
    println("Query 4 - Amount collected by route:")
    query4DS.show()

    //Step 7 - Query 5 - Amount collected by violation code
    val query5DS = baseDS.groupBy($"violation_code").sum("fine_amount")
    query5DS.show()
  }
}

It can be executed using the following command

spark-submit --class SparkDataSetBenchMark sparkdatasetbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

When this is executed on the cluster it creates 2 executors and completes in about 2.5 mins. If you were to compare this with RDDs post – minimal configuration time was 4mins. See Below

Step 4 – Create Resource Plans

Below are the resource plans which are used for setting the stage. They are same as ones used in Spark RDDs Performance Tuning – Resource Planning

Step 5 – Use resource plans

Tuning of clusters is achieved by providing the right type & amount of resources to the Spark Application. Providing more memory and/or more CPUs and/or executors may not always solve the issue but may exacerbate the performance issue you are trying to solve.

Brute force does not always work

The above saying is more accentuated in this post.

Resource Plan 1 – Min resources

Property NameValue
Number of Coresexecutor-cores1 CPU
Memoryexecutor-memory1 GB
Number of executorsnum-executors2

Check out the execution results in step 3 above.

Resource Plan 2 – Max number of executors. Min executor memory allocation.

Property NameValue
Number of Coresexecutor-cores1 CPU
Memoryexecutor-memory1 GB
Number of executorsnum-executors15

The following spark-submit command is used for executing

spark-submit --class SparkDataSetBenchMark --executor-cores 1  --executor-memory 1g --num-executors 15 sparkdatasetbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

The executor tab shows 15 executors launched. It takes 2.2mins which is not really a great saving even though we are using pretty much the entire cluster.

Resource Plan 3 – One executor per node. Min executor memory allocation

Property NameValue
Number of Coresexecutor-cores1 CPU
Memoryexecutor-memory1 GB
Number of executorsnum-executors4

The following spark-submit command is used for executing

spark-submit --class SparkDataSetBenchMark --executor-cores 1  --executor-memory 1g --num-executors 4 sparkdatasetbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

The executor tab shows 4 executors launched. It takes only 1.7mins as compared to the previous step with 15 executors which took 2.2mins.

Resource Plan 4 – One executor per node. 2CPUs per executor

Property NameValue
Number of Coresexecutor-cores2 CPU
Memoryexecutor-memory1 GB
Number of executorsnum-executors4
spark-submit --class SparkDataSetBenchMark --executor-cores 2  --executor-memory 1g --num-executors 4 sparkdatasetbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

The executor tab shows 4 executors launched. It takes 1.7mins again, the increase of CPUs did not help.

Resource Plan 5 – One executor per node. 2CPUs & 2GB per executor

Property NameValue
Number of Coresexecutor-cores2 CPU
Memoryexecutor-memory2 GB
Number of executorsnum-executors4
spark-submit --class SparkDataSetBenchMark --executor-cores 2  --executor-memory 2g --num-executors 4 sparkdatasetbenchmark.jar s3a://cloudwalker-sparks3/input/parking-citations-cleaned-2.csv

Observe the time has now increased with increase memory and cpu. So the brute force method seems to be exacerbating the performance issue!

Step 6 – Select a resource plan

Below is the summary of spark application execution with various resource plans.

Summary

Below is the summary of spark application execution with various resource plans and comparison with RDD application in this post Spark RDDs Performance Tuning – Resource Planning.

  • A lot of executors and throwing resources to a spark application may not lead to faster execution.
  • Increasing memory may not give as much an efficiency increase beyond a certain point.

This brings us to the end of the first entry on Spark DataSets Performance Tuning. In the next entries, we will look at how we can use better shuffling, caching & partitioning to better utilise resources for executors.

Leave a Reply

Your email address will not be published. Required fields are marked *