Spark Job Execution Model

Introduction

I have been wanting to write this entry for a long time. Spark Job Execution Model or how Spark works internally is an important topic of discussion. Having knowledge of internal execution engine can provide additional help when doing performance tuning. Let’s look at Spark’s execution model.

Flow of Execution of any Spark program can be explained using the following diagram.

Spark provides all this information quite nicely in its chatty logs. If you are free please take a look at spark logs. They offer quite good information. More on the logs later. 

Now let’s see how the Jobs are actually executed. We know the runtime components of a spark cluster. As such are aware of some of the components already. In this entry we will look at how these components interact and what is what sequence of events and where they all happen. We also introduce two more components into the mix – DAG Scheduler and Task Scheduler.

DAG Scheduler is the component which takes the logical plan of RDD dependencies and converts them into an actual physical plan. 

Task Scheduler is the component for submitting tasks to the Executors on the worker nodes. The scheduler applies a scheduling policy which is set by spark.scheduler.mode configuration parameter. There are two types of policies

  • FAIR (default)
  • FIFO

Spark Driver Program

Looking from the functional perspective this is how it looks like

A driver program is broken down into various jobs. Jobs are further broken down into various stages. Stages are further broken down into tasks. To understand how spark creates Jobs, Stages and tasks let’s take a simple example and analyse it. Keep in mind there are three input files.

  • some_data1.txt
  • some_data2.txt
  • some_data3.txt

All these files are listed at the end of the entry.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SparkExecutionModel {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setMaster("local[*]") //Master is running on a local node.
      .setAppName("SparkExecutionModel") //Name of our spark app
    val sparkContext = new SparkContext(sparkConf)

    //Step 1 - Transformation - Read a text file using the Spark Context and create an rdd.
    val linesRDD = sparkContext.textFile("some_data*").flatMap(_.split(" "))

    //Step 2 - Transformation - Split the text file into words and create a (key,value) pair
    val wordsRDD = linesRDD.map(x=>(x,x.size))

    //Step 3 - Action - Save to disk as text file.
    wordsRDD.groupByKey.foreach(println)

    //Step 4 - Action - Count and print
    println(s"Word count is ${wordsRDD.count}")
  }
}

Spark Jobs

The following is the list of jobs which were generated for the code above.

IMPORTANT : Observe that actions in the driver program correspond to the number of jobs in the DAG which is generated. In this case there are two actions and hence two jobs – Job Id 0 & Job Id 1.

Job Stages

Let’s dig a bit more and click on Job Id – 0. The picture below shows the stages for Job Id-0

This is where it gets interesting. How does spark decide on stages? As we know by now that spark is lazy and it tries to use resources efficiently. Spark looks at operations it can do independently on a partition. Spark groups operations together in a stage until there is a need to redistribute the data. This redistribution of data is also called shuffling. More on shuffling in a separate blog entry.

Now, with this information let’s look at our stages.

Stage – 0 which is using flatMap & map, both these transformations are narrow transformations and do not need to move or redistribute the data. Hence, they can be clubbed together in a stage. 

Stage – 1 is consists of a transformation – groupByKey.This transformation is a wide transformation and would require data from other partitions in the spark cluster. Hence this would lead to re-distribution of data a.k.a shuffling. The foreach action does not need to shuffle the data but works on sending the data to the driver program and hence is in the same stage.

Tasks

Finally, we come to tasks. Task consists of two things data and the code for the stage – think closures. All the code for a stage is executed along with the data. The driver sends tasks to the executor(s). Note – Because driver is running in a single thread there is usually small delay when the driver sends tasks to the executors, even though they might run in parallel at the same time.  

Let’s drill down on our stage by clicking on the stage-0 – this is the stage which does all the reading of files and map stage. The most efficient in this case is to read all the files in parallel and that is exactly what spark does by launching three tasks in stage-0. In my environment, there are three cores and each executor is allocated one core. Below is the screen grab from Eventline in the Spark Web UI. Keep in mind that, Spark completes all the tasks in a stage before moving onto the next stage.  In our case it will complete stage-0 before moving onto stage-1.

In case, there were four data files instead of three in that case there would be four tasks. However, one of the task would be executed after one of the tasks finishes and an executor becomes available. Below is the screen grab of the event timeline from such an execution

Below are the data files which I used for testing

some_data1.txt

Rank tall boy man them over post now.
Off into she bed long fat room.
Recommend existence curiosity perfectly favourite get eat she why daughters.
Not may too nay busy last song must sell.
An newspaper assurance discourse ye certainly.
Soon gone game and why many calm have.

some_data2.txt

Delightful remarkably mr on announcing themselves entreaties favourable.
About to in so terms voice at.
Equal an would is found seems of.
The particular friendship one sufficient terminated frequently themselves.
It more shed went up is roof if loud case.
Delay music in lived noise an.
Beyond genius really enough passed is up.

some_data3.txt

In as name to here them deny wise this.
As rapid woody my he me which.
Men but they fail shew just wish next put.
Led all visitor musical calling nor her.
Within coming figure sex things are.
Pretended concluded did repulsive education smallness yet yet described.
Had country man his pressed shewing.
No gate dare rose he.
Eyes year if miss he as upon.

Hope this entry has been useful. Till next time …Good bye!

Leave a Comment