Spark Core – Caching

Introduction

Cache in the computing world is a hardware or software component that stores data so that future requests can be served faster. A cache stores either some pre-computed data or copy of data stored somewhere else. Reading data from the cache, is usually faster than recomputing a result or reading from a slower data store and hence the software performance is increased. The process of seeding/populating the cache is called caching.

Spark is very good for in-memory processing. We also know it is lazy when it comes to processing. Guess what – spark caching is also lazy in nature. So let’s take an example with the following assumptions.

  • Spark has to perform two different actions on the same data
  • There is a common intermediate rdd in the lineage.

In case of no caching, spark computes all the RDDs in the DAG to generate the result. However, once the action is complete, it will also discard the intermediate RDDs to free up memory resources. Spark also removes the common intermediate RDD(s). When the next action comes along, Spark will re-create all the RDDs once again including the common intermediate RDD(s).

The above is an inefficient use of resources. It may take a lot longer as the same computation has to be done twice. The spark solution is to persist or cache the intermediate RDD(s). This has a few consequences

  1. RDD which is in cache occupies memory even after the action is completed.
  2. Lineage is broken – Once the RDD has been cached Spark does not compute any preceding RDDs leading upto the cached RDD. It uses the data stored in the cached RDD and moves on.
  3. When an RDD is cached all partitions across the cluster are cached.

Let’s look at the Spark API which allow the developers to cache.

Caching APIs

cache is a method available in org.apache.spark.rdd.RDD class. It allows you to cache the RDD. Keep in mind that this is a lazy method. So till an action is triggered RDD will not be cached.

def cache(): RDD.this.type

Let’s see a simple example of RDD caching.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SparkTransform {
def main(args: Array[String]): Unit = {
val sparkConf  = new SparkConf()
.setMaster("local[*]")  //Master is running on a local node.
.setAppName("SparkTransform") //Name of our spark app

val sparkContext = new SparkContext(sparkConf)

//Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
val linesRdd = sparkContext.textFile("new_somedata.txt")

//Step 2 - Let's cache the results as well.
linesRdd.cache

//Step 3 - Transformation - Split each line into words using - map method
val wordRDD1 = linesRdd.map(_.split(" ")).filter(x=>x.length<10)

    //Step 4 - Action - take the result and print it.
    //map returns an Array[Array of Strings] - Array[Array[String]]
    println(wordRDD1.count)

    //Step 5 - Transformation - Split each line into words using - flatMap method
    //flatMap returns an Array of Strings - Array[String]
    val wordRDD2 = linesRdd.flatMap(_.split(" ")).distinct
    println(wordRDD2.count)
  }
}

The code will return the same result with or without the cache method call in step2. However, the performance would be very different.

  • Without using cache – It take took around 9 mins to run the code
  • With Caching  – It took less than 4mins to run the entire code to process a 2.1gb file.

Caching Step-by-step

Let’s analyse what happened when cache was used step by step.

Step 1 – Transformation -read a text file. Spark reads the instruction to create a linesRdd – but does nothing

Step 2 – Caching request. Spark reads the instruction to cache the linesRdd – but does nothing again because caching is also a lazy.

Step 3 – Transformation – another transformation to filter words

Step 4 – Action – to return a word count. – This is where all spark reads the data file, caches the linerdd applies the filter and does a word count.

Step 5 – Transformation to split the linesRdd and extract the unique/distinct words to wordRDD2.

Step 6 – Action to do a count – NOTE – In this step instead of going all the way back and reading the 2.1gb file and recreating the linesRdd. Spark reads the data already cached in memory. It is able to complete the action. This saves time and improves performance.

In the real world scenario you would have many complex transformation steps and RDDs. Caching allows you to make your code efficient. All this comes at a cost and may have unintended consequences.

  • Too much caching may cause out-of-memory issues because cached data occupies memory.
  • Too much caching can reduce performance – Spark will create more intermediate RDDs or Dataframes instead of final result. That will result in inefficient code. 

persist is another method which is used for caching. It is similar to cache but offers more options. It has different storage levels to create different caching strategies. Let’s first look at the method signature.

def persist(newLevel: StorageLevel): RDD.this.type

StorageLevel parameter passed to persist method defines how cached RDDs are stored. It allows us to define different caching strategies.

  • MEMORY_ONLY is the default. Data across all the partitions is stored in the JVM as deserialized Java Objects. 
  • MEMORY_AND_DISK data is stored in the JVM. If the partitions donot fit in the JVM, then the entire partition is stored on a disk. It is read when required. Sometimes reading from disk is faster than re-computation.
  • MEMORY_ONLY_2 – In addition, to features offered MEMORY_ONLY, this also offers replication of data and is useful in recovery.
  • MEMORY_AND_DISK_2 – In addition to features offered MEMORY_AND_DISK this also offers replication of data and is useful in recovery.
  • DISK_ONLY data is stored only on disk in a serialized format.
  • OFF_HEAP
  • MEMORY_ONLY_SER all the data inside the RDD across all the partitions is stored in the JVM as serialized Java Objects.
  • MEMORY_AND_DISK_SER all the data inside the RDD across all the partitions is stored in the JVM as serialized Java Objects. If a partition does not fit in the memory then it is stored on the disk.
  • MEMORY_ONLY_SER_2 – In addition to features offered MEMORY_ONLY_SER this also offers replication of data and is useful in recovery.
  • MEMORY_AND_DISK_SER_2 – In addition to features offered MEMORY_AND_DISK_SER this also offers replication of data and is useful in recovery.

Important Points

  1. You must have noticed that – all the StorageLevel options ending with _2 offer replication and it is useful during recovery if there are executor or node crashes. This may make sense if there is a large cost/time involved in re-computing the RDD.
  2. Serialised options – the ones with _SER in them – occupy less space but the downside is that there is a serialisation and deserialisation cost involved. But if memory is at a premium that trade off can be made. 
  3. Spark by default uses the Kyro serializer/deserializer.

In Spark, it is possible to configure the amount of memory available for storage. See the two configuration parameters below –

  • spark.memory.fraction – This is by default set to 0.6. Which means that 60% of the JVM memory is allocated for execution and storage.
  • spark.memory.storageFraction – This is by default set to 0.5. Which means that 50% of the spark.memory.fraction is set for storage of RDDs.

Eviction of cached data

Eviction of data happens based on the LRU Least Recently Used strategy for RDDs.  If the partitions have not been used they will be evicted to make way for newer partitions that require storage. The evicted partitions may be re-computed if they are required and would be again stored in the memory.

If MEMORY_AND_DISK is used, then partitions will be written to disk and eviction may be avoided. This contradictory because disk-based I/O is slow. However, re-computation may take longer that reading data from the disk.

Finally, the last api in the grand scheme of things is unpersist. This required for removing an RDD from the memory if required. It removes all the memory blocks related to an RDD. It can be found in  org.apache.spark.rdd.RDD

def unpersist(blocking: Boolean = true): RDD.this.type

A simple example would be as below

linesRdd.unpersist

Hopefully this entry would give you some insight into the world of Spark caching and how it can help you improve performance and make your code more efficient.

Till next time have a good time!

1 thought on “Spark Core – Caching”

Leave a Comment