Spark – Shared Variables

Shared variables is a feature of spark which allows it to maintain variables across the entire spark cluster while executing a job. Not every variable can be made shared and there are certainly some rules around what you can do, cannot do and how to create and manage shared variables. There can only be two types of shared variables

  • Broadcast Variables
  • Accumulator Variables

Broadcast Variables

Broadcast variables send read-only data from the driver node to all the worker nodes. Makes me wonder why they are called variables ๐Ÿ™‚ if they are read-only. There are numerous use-cases when the same copy of the data may-be required across all the worker nodes. 

Quoting from spark documentation

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Spark Documentation

In the absence of broadcast variables the data will be distributed across the cluster. Spark will send all variables referenced in the closures to the worker nodes. If the same variable(s) are used in different operations it will send the variable(hence the data inside it) again. That means, inefficient job execution due to increased network traffic between the worker nodes. 

Using broadcast variables, the driver node sends read-only copies of the same data across all worker nodes. Hence, job execution is improved because of the following.

  • Reduced Shuffling & Network Latency
  • Reduced communication costs

One thing you need to beware of is the size of the broadcast variable. It should be able to reside inside the driver node. If it cannot then you will have Out of Memory errors. Hence, try and keep broadcast variables to a small size. Size is also a relative term here but it will all depend upon the resources at your disposal. A size in tens of MBs is going to work just fine. You can also tune some of the spark configuration parameters but try not to go over a GB.

Broadcast variable class is org.apache.spark.broadcast.Broadcast. This class has methods to create and destroy broadcast variable. Let’s look at how to use the API to create broadcast variables.

In the example below, we will broadcast an RDD consisting of a list of airports with their cities which would be joined with airlines data and count the number of airports by country. It may look a bit counter intuitive at first but stay the course, who said it was walk in the park ๐Ÿ˜‰ 

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SparkBroadcast {
  def main(args: Array[String]): Unit = {
    val sparkConf  = new SparkConf()
      .setMaster("local[*]")  //Master is running on a local node.
      .setAppName("SparkBroadcast") //Name of our spark app
    val sparkContext = new SparkContext(sparkConf)

    //Step 1 - Read the cities & airports file
    val citiesRDD = sparkContext.textFile("cities.txt").map(x => (x.split(",")(0),x.split(",")(1)))
    val airportsRDD = sparkContext.textFile("airports.txt").map(x => (x.split(",")(0),x.split(",")(1)))

    citiesRDD.foreach(println)
    //Step 2 - Counter Intuitive - Collect the records back to the driver and then broadcast.
    val citiesBroadcast = sparkContext.broadcast(citiesRDD.collectAsMap)

    //Step 3 - Use the broadcasted variable to do the processing.
    val countriesRDD = airportsRDD.map(city=>(citiesBroadcast.value.getOrElse(city._2,"No Country"),1))

    //Step 4 - Now apply reduceBy key to get the result
    val resultRDD = countriesRDD.reduceByKey(_+_)
    resultRDD.collect.foreach(println)
  }
}

Data files are below

airports.txt

lhr,london
lgw,london
dxb,dubai
jfk,new york
lax,los angles
ord,chicago

cities.txt

london,UK
dubai,UAE
chicago,USA
los angles,USA
new york,USA

Let’s analyse this code step by step to understand what is happening

  • Step 1 – Read two files using Spark Context to create two RDDs.
  • Step 2 – Collect the RDD as a Map and then broadcast it to the cluster nodes. This step is a bit counter-intuitive as the code is split in two substeps
    • An RDD is first created then collected as a map by the driver program
    • Broadcast the collected map back to the cluster
  • Step 3 – Use the broadcasted variable and do a look for a key. If the key is found return the value else returns “No Country”
  • Step 4 – Is a simple reduceByKey

Accumulator Variables

Accumulator shared variables are used to define custom aggregation across the spark cluster. They can be used for associative or commutative functions. Now, what does that mean

Commutative Rules means – f(x, y) = f(y, x) 

Example – f(x, y) = x + y Then f(1,2) = 3 and so f(2,1) which means additive function f(x,y)=x + y is associative in nature.

Associative Rules means f(f(x, y),z) = f(f(z, y),x)

Example – f(f(x, y),z) = f(x, y) + z and f(x, y) = x + y,  f(f(1,2),3) = f(1,2) + 3. f(1,2) = 1+ 2

In spark, for example, sum, min, max would be both associative and commutative aggregations but statistical function like mean and median may not be classed in any of the above categories.

In addition, to above there are a couple of rules you need to remember about Accumulator variables.

  • They should NOT be defined in transformations. Spark cannot guarantee that they will provide the correct results because transformations are lazy by nature. They may be evaluated more than once. Accumulators, if they are evaluated more than once, may cause wrong results.
  • Spark recommends using accumulator variables in action operations so that they are evaluated only once. Hence providing correct results.

Accumulator API was enhanced in spark version 2.0.0 and is now available in org.apache.spark.util.AccumulatorV2. AccumulatorV2 is the base class for sub-class for the following subclasses

Accumulator API is available in org.apache.spark.util.AccumulatorV2. AccumulatorV2 is the base class for sub-class for the following subclasses

  • CollectionAccumulator
  • DoubleAccumulator
  • LongAccumulator
  • LegacyAccumulatorWrapper

LongAccumulator & DoubleAccumulator

Let’s examine them one by one. Below is an example of using Long Accumulator API. The example tries to count the number of words. It is a simple example to show how accumulator works. Double Accumulator also works in the same way

import org.apache.spark.{SparkConf, SparkContext}

object SparkAccumulator {
  def main(args: Array[String]): Unit = {
    val sparkConf  = new SparkConf()
      .setMaster("local[*]")  //Master is running on a local node.
      .setAppName("SparkAccumulator") //Name of our spark app
    val sparkContext = new SparkContext(sparkConf)

    //Step 1 - Declare a long accumulator
    val custAcc = sparkContext.longAccumulator("custom accumulator")

    //Step 2 - Read the data file
    val wordsRDD = sparkContext.textFile("some_text.txt")
      .flatMap(_.split(" "))
      .map((_,1))

    //Step 3 - Call the accumulator in the action.
    wordsRDD.foreach(x => custAcc.add(x._2))

    //Step 4 - Print the value
    println(s"Count of words is ${custAcc.value}")
  }
}

The main focus of the code should be step-3 which uses the accumulator variable. As mentioned in the comment of step-3.

CollectionAccumulator

This is probably one which not many have used or encountered in the normal course of Spark world. This accumulator aggregates and returns a list of elements rather than a sum of elements. 

import org.apache.spark.{SparkConf, SparkContext}

object SparkAccumulator {
  def main(args: Array[String]): Unit = {
    val sparkConf  = new SparkConf()
      .setMaster("local[*]")  //Master is running on a local node.
      .setAppName("SparkAccumulator") //Name of our spark app
    val sparkContext = new SparkContext(sparkConf)

    //Step 1 - Declare a collection accumulator
    val custAcc = sparkContext.collectionAccumulator[String]("custom accumulator")

    //Step 2 - Read the data file
    val wordsRDD = sparkContext.textFile("some_text.txt")
      .flatMap(_.split(" "))
      .map((_,1))

    //Step 3 - Call the accumulator in the action.
    wordsRDD.foreach(x => custAcc.add(x._1))

    //Step 4 - Print the value
    println(s"Collection of words is ${custAcc.value}")
  }
}

The output is probably more interesting than the code.

Click to enlarge

Custom Accumulator 

The final accumulator type which we look at is a custom accumulator. A custom accumulator is written by implementing the abstract class org.apache.spark.util.AccumulatorV2. Below is an example of a custom accumulator which removes the word “the” when doing the count of words. See Below

 The first is the custom accumulator class followed by main function.

CustomerAccumulator.scala

import org.apache.spark.util.AccumulatorV2

//The class takes String as input and gives Int as output hence
//AccumulatorV2[String,Int]
class CustomAccumulator(private var count: Int ) extends AccumulatorV2[String,Int]{

  //Check if the value is zero or not.
  override def isZero: Boolean = {
    value == 0
  }

  //Creates a copy of the custom accumulator object
  override def copy(): AccumulatorV2[String, Int] = {
    new CustomAccumulator(count)
  }

  //Resets the value of the custom accumulator object to zero
  override def reset(): Unit = {
    count = 0
  }

  //Add the value if the word is not "the"
  override def add(x:String): Unit = {
    if (x!="the")
      count = count + 1
  }

  //Merges the contents of the two accumulator objects
  override def merge(other: AccumulatorV2[String, Int]): Unit = {
    count = count + other.value
  }

  //Returns the value of count
  override def value: Int = {
    count
  }
}

Please read the code and also the comments inside it as they explain what the methods  are doing.

SparkAccumulator.scala

import org.apache.spark.{SparkConf, SparkContext}

object SparkAccumulator {
  def main(args: Array[String]): Unit = {
    val sparkConf  = new SparkConf()
      .setMaster("local[*]")  //Master is running on a local node.
      .setAppName("SparkAccumulator") //Name of our spark app
    val sparkContext = new SparkContext(sparkConf)

    //Step 1 - Declare a custom accumulator
    val custAcc = new CustomAccumulator(0)

    //Step 2 - Register Custom Accumulator
    sparkContext.register(custAcc,"my custom accumulator")

    //Step 3 - Read the data file
    val wordsRDD = sparkContext.textFile("some_text.txt")
      .flatMap(_.split(" "))
      .map((_,1))

    //Step 4 - Call the accumulator in the action.
    wordsRDD.foreach(x => custAcc.add(x._1))

    //Step 5 - Print the value
    println(s"Count of words is ${custAcc.value}")
  }
}

Finally the data file used

the sample text is ready
Then this is slightly some more text
the name of the file is some text
city of angles is the los angles
sentence starts with the
the name of the book is blah

The output of the job is below

click to enlarge

I hope this was helpful. See you again soon!

Leave a Comment