DataSet Operations – Multi-Dimensional Aggregation

Spark Multi-Dimension aggregation methods like cube and rollup allow the creation of aggregates across a various combination of columns. So in a sense, they are an extension of groupBy with features of union operators rolled in. They can help you create pretty great datasets very quickly and easily and save you from writing an awful lot of complicated code to achieve the same result. Let’s unpick both these methods one by one and look at examples.

Rollup

Let’s assume that you have a dataset which has product type, year, quarter and quantity columns. You have been asked to create a single dataset in which you have the following combinations of aggregations – total quantity

  • by product type, year and quarter
  • by product type and year
  • by product type
  • Finally the total quantity for all product types, all years & quarters.

One way would be to create three datasets using groupBy and then somehow union them all together. The other way would be to use – guess what – a rollup.

A rollup calculates subtotals and a grand total over an ordered combination of columns. What does that mean??

Let’s take our example earlier of four columns. Quantity column is the one which will get aggregated.

In a rollup, Quantity will be aggregated across the following group of columns

When a rollup is applied to above it will create the following combinations of columns. Observe the order and not all combinations just a simple ordered list. See Below

If you have got this correct then remaining is the syntax which is very simple. See Below

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkDataSetsMulDimAggregates {
  def main(args: Array[String]): Unit = {
    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark DataSet Window Functions")
      .getOrCreate

    //Step 2 - Read the products.csv
    val ds = spark.read
      .option("header",true)
      .option("delimiter",",")
      .csv("products_v2.csv")
      .groupBy("ProductLine","Year","Quarter")
      .agg(sum("Quantity") as "Quantity")

    //Step 3 - Create a rollup
    val resultDS = ds.rollup("ProductLine","Year","Quarter")
      .agg(sum("Quantity") as "totalQuantity")

    //Step 4 - Spark Actions - Show the results
    import spark.implicits._
    resultDS.orderBy($"ProductLine".asc_nulls_first,$"Year".asc_nulls_first, $"Quarter".asc_nulls_first).show(20,false)
  }
}
  • Step 1 creates a Spark Session
  • Step 2 reads the data file.
  • Step 3 is where we call the rollup with a sum of quantity.
  • Step 4 Spark Action – orders and shows the data

The relevant part of the output log is shown below

Cube

Cube is slightly different from rollup. It calculates subtotals and a grand total for every permutation of the columns specified. All possible groupBy columns would be created and unioned together. As per spark, it should be a more efficient than creating separate datasets and then doing a union. Let’s take the same dataset as we used in the previous example and see how it works

Creating all possible permutations should look something like this

If you have understood the above well the implementation could not be simpler. See Below

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkDataSetsMulDimAggregates {
  def main(args: Array[String]): Unit = {
    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark DataSet Window Functions")
      .getOrCreate

    //Step 2 - Read the products.csv
    val ds = spark.read
      .option("header",true)
      .option("delimiter",",")
      .csv("products_v2.csv")
      .groupBy("ProductLine","Year","Quarter")
      .agg(sum("Quantity") as "Quantity")

    //Step 3 - Create a cube
    val resultDS = ds.cube("ProductLine","Year","Quarter")
      .agg(sum("Quantity") as "totalQuantity")

    //Step 4 - Show the results
    import spark.implicits._
    resultDS.orderBy($"ProductLine".asc_nulls_last,$"Year".asc_nulls_last, $"Quarter".asc_nulls_last).show(30,false)
  }
}
  • Step 1 creates a Spark Session
  • Step 2 reads the data file.
  • Step 3 is where we call the cube with a sum of quantity.
  • Step 4 Spark Action – orders and shows the data

It is not possible to do a screenshot of the entire output because it is a very big dataset which is generated. But observe how various permutations are being generated. You can run the code and print the entire dataset and see how the various permutations are being generated. The relevant part of the log is shown below


Datasets for this blog entry is available on this link. Hope this entry has been helpful. With this we have come to the end of spark datasets. Next entries would focus on various Spark integrations with Hive, Cassandra and AWS services. Till then …..byeee!

Leave a Comment