We cannot prevent shuffling. We can only reduce it. Ask a spark developer about performance issues and two things he/she will talk about is shuffling & partitioning. In this entry, we will focus just on shuffling. It would be a good idea to read one of the earlier posts on spark job execution model.
When data is distributed across a spark cluster, it is not always where it should be. Therefore, from time to time spark may need to move data across the various nodes to complete specific computations.
Shuffling is a process of redistribution of data across the various partitions. Data is moved across nodes. Technically a shuffle consists of Network I/O, Disk I/O and Data serialisation. Network I/O is the most expensive operation and is usually the focus when everyone talks about shuffling. Disk I/O is probably the surprising thing for everyone but is interesting as well and helps in the overall understanding of spark shuffling world.
There is no specific API available to developers to call a shuffle but there are numerous operations in spark which will trigger a shuffle – for example groupByKey. Rather all wide transformations will trigger a shuffle.
Job Stage & Shuffle
Quick Recap – We know that spark will keep grouping operation together which can operate on a data partition. All these operations are called stage. It would then go ahead and create a second stage which is based on the redistributed data and keep going till it final reaches the result.
Let’s dig a bit deeper into the creation of stages to understand this better. There are two things which happen
- Identifying an RDD which needs a shuffle. Such an RDD is called a ShuffledRDD. This is done when the DAG is formed. The link org.apache.spark.rdd.ShuffledRDD has the API documentation.
- DAG scheduler takes this DAG as an input and where there is a ShuffledRDD it is able to identify the dependencies using getDependencies method. Hence creating stages for the jobs.
This is where we dive into Shuffle. In this section we will cover two things
- Bit of History
- Sort based Shuffle
- Shuffle Configuration
Bit of History
Note – This has some map-reduce paradigm
Once upon a time 🙂 – during the pre Spark 1.2 days. Spark used to have Hash based shuffling and later it replaced it with Sort based shuffling. By 2.0 Hash based shuffling was gone.
Hash based shuffling meant every task(mapper) in a stage would generate a file for every task(reduce) for the next stage.
Above is a simple example but it does illustrate the issues with shuffle
- There is Disk I/O during shuffle.
- If there were 10s of thousands of these intermediate files or more. Bad times.
The task which generates the intermediate files is called ShuffleMapTask.
Sort based shuffle
Sort based index works by task generating multiple files but then merging them together into a single file sorted by the key’s partition id. In addition, to the sorted data file it also generates an index file which has information about where the data for a particular partition starts and ends.
Index which is created is a sparse index. The index is a Key-value pair of partition id and start location i.e (partitionId, startLocation)
This method of shuffling has two advantages
- This method of shuffling data reduces the number of files dramatically and to a manageable level – 2 per ShuffleMapTask. In addition to that this data can also be compressed so as to reduce memory utilized and out-of-memory errors.
- Because the number of files are reduced the number of seek requests were high which would also lead to inefficient shuffles. This is also eliminated by Sort based shuffle
Note – Spark does not sort the data inside the partition when the intermediate data file is created that is done by the reducer tasks.
To Summarize, Sort based shuffle creates only two output files per map task.
- Intermediate file sorted on key’s partition id
- Index file.
When the reducer tasks request data they provide the ranges of the partitions and hence getting the data they need.
There are various configuration parameters which you can use to manage how shuffling takes place. Usually you donot need to change them and default will suffice but you can always change them if required. You can have a look at this link for a complete list. In this entry only a few are covered.
- spark.shuffle.compress – default true.The compress codec is managed via spark.io.compression.codec.
- spark.shuffle.spill.compress – default true. Compresses data which is spilled to disk. The compress codec is managed via spark.io.compression.codec.
- spark.shuffle.file.buffer – default 32K. Size of the in-memory buffer for each shuffle file output stream. Reduces the number of seeks and improves performance.
I hope you found this entry useful. See you back soon!