DataSet Operations – Aggregation – Window Functions

Before we dive headfirst into Window functions it would be a good idea to review the groupby aggregate functions. An important thing to note, when groupBy functions are applied they reduce the number of rows returned.

However, when Window functions are applied they do not reduce the rows in the result set. Window describes the set of rows on which the window function operates. It returns a value which is shown in all the rows of the window. One more thing, Window Functions also help us compare the current rows with other rows.

Simple Aggregations

Below is an example showing two operations to give a contrast in the results.

  • avg function when a groupBy is applied
  • avg function when a window is applied
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

object SparkDataSetWindowFunctions {
  def main(args: Array[String]): Unit = {

    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark DataSet Window Functions")
      .getOrCreate

    //Step 2 - Read the products.csv
    val ds = spark.read
      .option("header",true)
      .option("delimiter",",")
      .csv("products.csv")

    //Step 3 - Get the average quantity sold for each product type.
    //This step is only for showing the reduction in the number of rows.
    val productTypeDS = ds.groupBy("ProductType")
      .agg(avg("Quantity") as "avg_Quantity")
      .select("ProductType","avg_Quantity")

    productTypeDS.show(10,false)

    //Step 4 - Create a Window expression.
    val window = Window.partitionBy("ProductType")
   
    //Step 5 - Apply the window on avg aggregation method.
    val finalDS = ds.withColumn("avg_Quantity", avg("Quantity").over(window))

    //Step 6 - Show the result.
    finalDS.orderBy("ProductType","Product").show(20, false)
  }
}

Let’s look at the Step – 4 & 5 which are the most important in this example.

  • Step 1,2,3 – Creating a spark context, Reading a data file
  • Step 3 – Doing a groupBy average
  • Step 4 – Create a Window expression- also called the Window Spec – which is telling spark to create sets of rows based on the values of Product Type column.
  • Step 5 – Applies the Window Spec over an average aggregation.

Observe that there is no groupBy method call when the window function is applied.

Let’s look at some more interesting examples like applying a rank & dense_rank.

The results from the log file are shown below.

GroupBy aggregation
Window Function based aggregation

Observe that all the rows are returned and the average quantity (avg_Quantity) is generated for every Product Type(ProductType). Each product type has a same the value for average quantity.

Rank & Dense Rank

Let’s look at another interesting example of ranking the data based on its value in a Window. So, we want to rank the quantity in the decrease order. We will use two functions rank and dense_rank.

  • The rank function assigns ranking within an ordered window. If the values of the two rows are the same, the rank function assigns the same rank, with the next ranking(s) skipped.
  • the dense_rank function assigns the ranking within an ordered window, but the ranks are consecutive. In other words, the same ranks are assigned to multiple rows and no ranks are skipped when compared to rank.

Let’s look at an example showing both the functions together.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

object SparkDataSetWindowFunctions {
  def main(args: Array[String]): Unit = {

    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark DataSet Window Functions")
      .getOrCreate

    //Step 2 - Read the products.csv
    val ds = spark.read
      .option("header",true)
      .option("delimiter",",")
      .csv("products.csv")

    //Step 3 - Create a Window expression.
    val window = Window.partitionBy("ProductType").orderBy("Quantity")

    //Step 4 - Apply the window on rank and dense_rank aggregation method.
    val finalDS = ds.withColumn("rankForQty", rank.over(window))
      .withColumn("denseRankForQty",dense_rank().over(window))

    //Step 5 - Show the result.
    finalDS.orderBy("ProductType","Quantity").show(20, false)
  }
}

The interesting part of this program is in Step3 & Step4

  • Step 1,2 – Create a spark session and read the data file.
  • Step 3 – Create a Window expression- also called the Window Spec – which is telling spark to create sets of rows based on the values of Product Type column. In addition to that, we use orderBy function and ask spark to order the rows based on values in the quantity column.
  • Step 4 – Applies the Window Spec over an average aggregation.
  • Step 5 – Show the result.

The relevant output from the data is shown below

rank and dense_rank

Observe how the ranks are created for rankForQty and denseRankForQty. When the rank function is applied – values which have the same value are assigned the same rank. Additionally, the subsequent ranks are skipped as is the case where quantity is 100,100,101 – the rank is 1,1,3 – rank 2 is skipped. dense_rank does not skip the rank so for the same example where quantity is 100,100,101 – the dense rank is 1,1,2 and no rank is skipped.

Cumulative Aggregations

Turning a page let’s now take another use case which comes along a lot. Having cumulative aggregations. A very commonly needed use case is a cumulative or running total. Let’s take look at how we can create a cumulative aggregate.

See the example below with

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

object SparkDataSetWindowFunctions {
  def main(args: Array[String]): Unit = {

    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark DataSet Window Functions")
      .getOrCreate

    //Step 2 - Read the products.csv
    val ds = spark.read
      .option("header",true)
      .option("delimiter",",")
      .csv("products.csv")

    //Step 3 - Create a Window expression.
    val window = Window.partitionBy("ProductType")
      .orderBy("Quantity")
      .rowsBetween(Window.unboundedPreceding,Window.currentRow)

    //Step 4 - Apply the window on sum aggregation method.
    val finalDS = ds.withColumn("cumulativeQty", sum("Quantity").over(window))


    //Step 5 - Show the result.
    finalDS.orderBy("ProductType","Quantity").show(20, false)
  }
}

Here we introduce a new method rowsBetween which allows us to set create cumulative aggregations. This method is passed two parameters it allows us to define a Frame with respect to the current row.

  • Start – In a partition, from where does the frame start with respect to the current row. In cumulative sum, the parameter is Window.unboundedPreceding.
  • End – In a partition, where does the frame end with respect to the current row. In cumulative sum, the parameter is Window.currentRow


Essentially, we are telling spark to go all the way to the start of the Window and from there look till the current row.

The output of the relevant part of the log is shown below. Observe the cumulativeQty with respective to the Quantity column.
cumulativeQty is running total of Quantity Column

cumulativeQty is running total of Quantity Column

Lead & Lag Functions

The next set of operators which allows us to compare the current row with the next row(s) or the previous row(s) in a Window. This is done by providing an offset to compare. Offset is relative to the current row.

For example, if we want to compare the quantity of the current row with the quantity in the next row we just say lead(“quantity”,1). Similarly, if you want to compare the quantity with the second row after the current row we just say lead(“quantity”,2). The lag function works in exactly the same way but in the other direction.

See below an example showing lead and lag functions in action.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

object SparkDataSetWindowFunctions {
  def main(args: Array[String]): Unit = {

    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark DataSet Window Functions")
      .getOrCreate

    //Step 2 - Read the products.csv
    val ds = spark.read
      .option("header",true)
      .option("delimiter",",")
      .csv("products.csv")

    //Step 3 - Create a Window expression.
    val window = Window.partitionBy("ProductType")
      .orderBy("Quantity")

    //Step 4 - Apply the window on lead & lag method.
    val finalDS = ds.withColumn("qtyInNextRow", lead("Quantity",1).over(window))
      .withColumn("qtyIn2ndRow", lead("Quantity",2).over(window))
      .withColumn("qtyInPrevRow", lag("Quantity",1).over(window))
      .withColumn("qtyIn2ndRowsBehindCurrentRow", lag("Quantity",2).over(window))

    //Step 5 - Show the result.
    finalDS.orderBy("ProductType","Quantity").show(20, false)
  }
}

Take a look at the Step-4 closely and it should be self-explanatory by now. In case there is no value found by the lead or the lag method, because the offset is less than the number of rows which are there a null is populated. If you want some other default value can also be populated and can be mentioned as a part of the method call. See the method signature for lead below. These are available in org.apache.spark.sql.functions

def lead(columnName: String, offset: Int, defaultValue: Any): Column
def lead(e: Column, offset: Int, defaultValue: Any): Column

The relevant part of the output is below

First and Last Value

Finally, we come to the last two methods in the Window functions and they give is the first value and the last value of a column in a Window. They are pretty simple to implement. See the code below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

object SparkDataSetWindowFunctions {
  def main(args: Array[String]): Unit = {

    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark DataSet Window Functions")
      .getOrCreate

    //Step 2 - Read the products.csv
    val ds = spark.read
      .option("header",true)
      .option("delimiter",",")
      .csv("products.csv")

    //Step 3 - Create a Window expression.
    val window = Window.partitionBy("ProductType")
      .orderBy("Quantity")
      .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    //Step 4 - Apply the window on first and last value method.
    val finalDS = ds.withColumn("qtyFirstValue", first("Quantity").over(window))
            .withColumn("qtyLastValue", last("Quantity").over(window))

    //Step 5 - Show the result.
    finalDS.orderBy("ProductType","Quantity").show(20, false)
  }
}

Essentially, we are telling spark to start from the first row of the Window and go all the go all the way to the end of the Window while it is looking for the first and the last value.

The relevant log is show below

First and Last Value Window Function

If you are interested in learning more – I advise you to start looking at the spark documentation here. and look for Window functions

A sample of data for this blog is given below. It is a subset of data published by IBM for Watson Analytics

ProductLine,ProductType,Product,Year,Quarter,Revenue,Quantity,GrossMargin
Camping Equipment,Cooking Gear,TrailChef Deluxe Cook Set,2012,Q1 2012,59628.66,489,0.34754797
Camping Equipment,Cooking Gear,TrailChef Double Flame,2012,Q1 2012,35950.32,252,0.4742745
Camping Equipment,Tents,Star Dome,2012,Q1 2012,89940.48,147,0.35277197
Camping Equipment,Tents,Star Gazer 2,2012,Q1 2012,165883.41,303,0.28293788
Camping Equipment,Sleeping Bags,Hibernator Lite,2012,Q1 2012,119822.2,1415,0.29145017
Camping Equipment,Sleeping Bags,Hibernator Extreme,2012,Q1 2012,87728.96,352,0.39814629
Camping Equipment,Sleeping Bags,Hibernator Camp Cot,2012,Q1 2012,41837.46,426,0.33560737
Camping Equipment,Lanterns,Firefly Lite,2012,Q1 2012,8268.41,577,0.52896022
Camping Equipment,Lanterns,Firefly Extreme,2012,Q1 2012,9393.3,189,0.43420523
Camping Equipment,Lanterns,EverGlow Single,2012,Q1 2012,19396.5,579,0.46149254
Camping Equipment,Lanterns,EverGlow Butane,2012,Q1 2012,6940.03,109,0.36186587
Mountaineering Equipment,Rope,Husky Rope 50,2012,Q1 2012,20003.2,133,0.32905585
Mountaineering Equipment,Rope,Husky Rope 60,2012,Q1 2012,14109.4,79,0.29165733
Mountaineering Equipment,Rope,Husky Rope 100,2012,Q1 2012,73970.22,227,0.30126435
Mountaineering Equipment,Rope,Husky Rope 200,2012,Q1 2012,77288.64,143,0.31477575
Mountaineering Equipment,Safety,Granite Climbing Helmet,2012,Q1 2012,62464.88,898,0.24468085
Mountaineering Equipment,Safety,Husky Harness,2012,Q1 2012,34154.9,559,0.28363339
Mountaineering Equipment,Safety,Husky Harness Extreme,2012,Q1 2012,36396.8,352,0.47843327
Mountaineering Equipment,Safety,Granite Signal Mirror,2012,Q1 2012,4074.84,126,0.51422387
Mountaineering Equipment,Climbing Accessories,Granite Carabiner,2012,Q1 2012,15122.72,4022,0.4787234
Mountaineering Equipment,Climbing Accessories,Granite Belay,2012,Q1 2012,19476.8,296,0.47613982
Mountaineering Equipment,Climbing Accessories,Granite Pulley,2012,Q1 2012,15739.22,427,0.50217037
Mountaineering Equipment,Climbing Accessories,Firefly Climbing Lamp,2012,Q1 2012,17998.56,464,0.42768755
Mountaineering Equipment,Climbing Accessories,Firefly Charger,2012,Q1 2012,36494,710,0.56498054
Mountaineering Equipment,Climbing Accessories,Firefly Rechargeable Battery,2012,Q1 2012,11673.6,1520,0.58984375
Mountaineering Equipment,Climbing Accessories,Granite Chalk Bag,2012,Q1 2012,4621.68,262,0.51643991
Mountaineering Equipment,Tools,Granite Ice,2012,Q1 2012,25041.6,333,0.48178191
Mountaineering Equipment,Tools,Granite Hammer,2012,Q1 2012,18118.38,241,0.2434158
Mountaineering Equipment,Tools,Granite Shovel,2012,Q1 2012,9543.16,164,0.33699948
Mountaineering Equipment,Tools,Granite Grip,2012,Q1 2012,10146.2,523,0.49020619
Mountaineering Equipment,Tools,Granite Axe,2012,Q1 2012,32870.4,856,0.49166667
Mountaineering Equipment,Tools,Granite Extreme,2012,Q1 2012,42563.2,566,0.38138298
Personal Accessories,Watches,Mountain Man Extreme,2012,Q1 2012,6499.8,23,0.5888535
Personal Accessories,Eyewear,Polar Sun,2012,Q1 2012,7015.34,116,0.56760471
Personal Accessories,Eyewear,Polar Ice,2012,Q1 2012,3825.8,37,0.51943907
Personal Accessories,Knives,Edge Extreme,2012,Q1 2012,30940.25,275,0.28895209
Personal Accessories,Knives,Bear Survival Edge,2012,Q1 2012,8414.75,97,0.48126801
Personal Accessories,Navigation,Glacier GPS Extreme,2012,Q1 2012,38534.28,114,0.4779303
Outdoor Protection,Insect Repellents,BugShield Extreme,2012,Q1 2012,25010.58,3801,0.63221884
Outdoor Protection,Sunscreen,Sun Shelter Stick,2012,Q1 2012,5718.15,1179,0.59587629
Outdoor Protection,First Aid,Compact Relief Kit,2012,Q1 2012,4057.2,180,0.60070985
Outdoor Protection,First Aid,Aloe Relief,2012,Q1 2012,711.28,136,0.63288719

Hope you have found this blog entry useful.

Till next time…..Byeee!

Leave a Comment