Spark Partitioning

Introduction

Transparently partitioning data across the worker nodes is one of the best features of Spark. It has made writing code simpler. Partitioning not done properly leads to data skew and eventually hitting performance issues. So having the right number of partitions and data at the right place is important to reduce performance issues and data shuffling

This post will cover the various types of partitioning and see how you can utilize this for your spark jobs quickly. 

IMPORTANT: Partitioning APIs are only available for pair RDDs. Pair RDDs are RDDs which are based on key, value pairs.

Before diving into Partitioning let’s look at a couple of highlights around partitions.

  • Each partition resides only one worker node. It cannot span across nodes.
  • Every worker node in a spark cluster has at least one partition. Can have more. 
  • Whenever you specify a partitioning strategy(Standard or Custom) make sure you persist/cache the data.

There are three types of partitioning in spark

  • Hash based Partitioning 
  • Range Partitioning
  • Custom Partitioning – yes – it is possible and yes – it is simple!

When no partitioning strategy is defined then

  1. Data is partitioned across the various worker nodes randomly
  2. Sizing is almost uniform across the partitions.

Hash Partitioner

Hash-based partitioner is one of the standard partitioner available spark.It is fairly simple in it execution. It is defined in org.apache.spark.HashPartitioner. The hash code of the key in a pair RDD is taken and modulo applied.

See Below

// numPartitions - the number of partitions you want.
// partitionId - the partition Id assigned to the key in the pairRDD
partitionId = key.hashCode % numPartitions

Let’s create a list of Key, Value pairs to explain this a little more

Below is a list of key,value pairs.

List = (100,2) , (29,4) , (51,6), (28,5), (9,4)

Say suppose you want to divide your data into 3 partitions. That means there are going to three partitions starting from 0,1,2. Let’s assume for simplicity our hashing function gives back the same value as the key. So if a value of 100 is passed we will get back 100. Let’s apply our logic

For Key, Value pair of 100,2
partitionId = hash(100) % 3
partitionId = 100 % 3
partitionId = 1
So the key, value pair is assigned partition 1

For Key, Value pair of 29,4
partitionId = hash(29) % 3
partitionId = 29 % 3
partitionId = 2
So the key, value pair is assigned partition 2

For Key, Value pair of 51,6
partitionId = hash(51) % 3
partitionId = 51 % 3
partitionId = 0
So the key, value pair is assigned partition 0

For Key, Value pair of 28,5
partitionId = hash(28) % 3
partitionId = 28 % 3
partitionId = 1
So the key, value pair is assigned partition 1

For Key, Value pair of 9,4
partitionId = hash(9) % 3
partitionId = 9 % 3
partitionId = 0
So the key, value pair is assigned partition 0

Once hash partitioning is applied the data would be distributed in the following partitions

  • Partition Id 0 has (51,6) , (9,4)
  • Partition Id 1 has (100,2) , (28,5)
  • Partition Id 2 has (29,4)

We do not need to write our own hashing function it is provided by standard java library. 

IMPORTANT: It is a good idea when you are changing the way the data is being partitioned that it be cached as spark uses lazy evaluation and once the data is partitioned across it is kept in those partitions and there is no need to re-apply the partition for different jobs on the same data. See Below

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object SparkPartitioning {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setMaster("local[*]") //Master is running on a local node.
      .setAppName("SparkPartitioning") //Name of our spark app
    val sparkContext = new SparkContext(sparkConf)

    //Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
    val linesRDD = sparkContext.textFile("some_data.txt")

    //Step 2 - Transformation - Split the text file into words and create a (key,value) pair
    val wordsRDD = linesRDD.flatMap(_.split(" "))        //Split the lines into words
      .map(x=>(x.take(1).toLowerCase,x.toLowerCase))     //Create a key value pair
      .partitionBy(new HashPartitioner(4))               //Create a new hash partitioner
      .cache


    //Step 3 - Print each partition as a list.
    wordsRDD.glom.collect().toList.foreach(x=>println(x.toList))
    println(s"Number of Partitions: ${wordsRDD.getNumPartitions}")


    //Step 4 - Action - Count and print
    println(s"Word count is ${wordsRDD.count}")
  }
}

The above code uses hash partitioning to create partitions and prints out the contents of the partitions as a list of values. See the output below.

Observer that in output. Same keys are grouped in the same partition. This will help in reducing shuffling if you were to use the groupByKey or reduceByKey methods on this data set.

Range Partitioner

Range partitioner is useful when we have a order list of key,value pairs. In this partitioning the key ranges are created by the partitioning API based on a data sample. When the range partitioning is applied to an RDD all the keys falling under a range are placed on the same partition. The range partitioner API is defined in org.apache.spark.RangePartitioner

Let’s see an example below

import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}

object SparkPartitioning {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setMaster("local[*]") //Master is running on a local node.
      .setAppName("SparkPartitioning") //Name of our spark app
    val sparkContext = new SparkContext(sparkConf)


    //Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
    val dataRDD = sparkContext.textFile("sorted_data.txt")
      .map(x=>(x.take(1).toLowerCase,x.toLowerCase))          //Create a key value pair

    //Step 2 - Create a range partitioner
    //use dataRDD for getting the correct range
    val rangePartitioner = new RangePartitioner(partitions = 4,rdd = dataRDD, ascending = true)

    //Step 3 - Transformation - Split the text file into words and create a (key,value) pair
    val wordsRDD = dataRDD.partitionBy(rangePartitioner)        //Apply the range partitioner to the wordsRDD
      .cache                                                    //Cache the RDD once it has been partitioned.

    //Step 4 - Print each partition as a list.
    wordsRDD.glom.collect().toList.foreach(x=>println(x.toList))
    println(s"Number of Partitions: ${wordsRDD.getNumPartitions}")


    //Step 5 - Action - Count and print
    println(s"Word count is ${wordsRDD.count}")
  }
}

The above code uses range partitioning to create partitions and prints out the contents of the partitions as a list of values. See the relevant output below.

Custom Partitioner

Custom Partitioner class can be used if none of the standard partitioning APIs donot meet the requirements. It is quite easy to write your own custom partitioner. We need to implement the class org.apache.spark.Partitioner which is also the parent class for Hash and Range partitioner class.

We need to implement the following methods

  1. abstract def getPartition(key: Any): Int – This method takes the key as input and returns the partition from 0 to numPartitions – 1
  2. abstract def numPartitions: Int – Integer variable which keeps track of the number of partitions in the partitioner object.

Below is the code for the custom partitioner class. 

myPartitioner.scala

import org.apache.spark.Partitioner

class MyPartitioner(override val numPartitions:Int) extends Partitioner{
  override def getPartition(key: Any): Int = {
    //Check the key
    //If key is a,b,c then assign partition 0
    //If key is d,e,f then assign partition 1
    //If key is j,l,m then assign partition 2
    //If key is v,s,t then assign partition 3
    //else 4
      key.asInstanceOf[String] match {
      case "a"|"b"|"c" => 0
      case "d"|"e"|"f" => 1
      case "j"|"l"|"m" => 2
      case "v"|"s"|"t" => 3
      case _ => 4
    }
  }
}

Below the code which uses the custom partitioner which we have defined.

SparkPartitioning.scala

import org.apache.spark.{SparkConf, SparkContext}

object SparkPartitioning {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setMaster("local[*]") //Master is running on a local node.
      .setAppName("SparkPartitioning") //Name of our spark app
    val sparkContext = new SparkContext(sparkConf)


    //Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
    val dataRDD = sparkContext.textFile("sorted_data.txt")
      .map(x=>(x.take(1).toLowerCase,x.toLowerCase))          //Create a key value pair
   

    //Step 2 - Transformation - Split the text file into words and create a (key,value) pair
    val wordsRDD = dataRDD.partitionBy(new MyPartitioner(numPartitions = 6))        //Apply the custom partitioner to the wordsRDD
      .cache                                                     //Cache the RDD once it has been partitioned.

    //Step 3 - Print each partition as a list.
    wordsRDD.glom.collect().toList.foreach(x=>println(x.toList))
    println(s"Number of Partitions: ${wordsRDD.getNumPartitions}")


    //Step 4 - Action - Count and print
    println(s"Word count is ${wordsRDD.count}")
  }
}

The above code uses custom partitioner class MyPartitioner to create partitions and prints out the contents of the partitions as a list of values. PartitionId 5 is empty because MyPartitioner class never assigns any data to this partition. See the relevant output below.

Good Know things about Partitioning

Before we close this, there are a couple of things that need to remember when applying partitioning strategies(either standard or custom class). These are good to know and will help in trouble shooting.

Default Partitioning in specific Transformations

1. groupByKey – the default partitioner is the hash partitioner. It is possible to assign a different partitioner. See the API documentation on this link. Below is an example of providing a different partitioner if required.

//myCustomPartitioner is a custom partitioner class
//During initialisation set the number of partitions
val wordsRdd = dataRDD.groupByKey(myCustomPartitioner(3))

If you are using groupByKey – the default partitioner is the hash partitioner. It is possible to assign a different partitioner. See the API documentation on this link

2. sortByKey – the default partitioner is a range partitioner. The partitioning strategy cannot be changed but you can mention the number of partitions which are required. See Below

val wordsRD = dataRDD.sortByKey(numPartitions = 3)

3. repartitionAndSortWithinPartitions– This method repartitions the RDD according to the given partitioner and within each resulting partition, sorts by their keys. We can also specify a custom partitioner in this method if required.

Losing Partitioning Strategy during Transformations

While using custom partitioning it possible that a transformation is applied, it may result in a RDD which may not have the same partitioner attached to it. Oops! – that is not something which is well known and should be kept in mind. Some of the most used transformations have this characteristic – map and flatMap

So if you are using map transformation on a RDD which has a custom partitioning strategy. The reason is that during some of the transformations on pair RDDs it is possible to change the keys and hence the same partition strategy may not be applicable.

There are two ways around it

  1. Use a transformation like mapValues which will mean that there is no change to the keys and hence the partitioner is retained in the resulting RDD.
  2. Re-apply the same strategy if your keys are not changing and you are using a transformation that will cause you to loose the partitioner. 

Below are some of the transformations which will not change the partitioner when they are applied on pair RDDs.

  1. groupByKey
  2. reduceByKey
  3. cogroup
  4. join
  5. mapValues
  6. flatMapValues
  7. filter

Phew! that brings me to the end of this looooooooongish post. Hope this is helpful to you.

Leave a Comment