Before we dive headfirst into Window functions it would be a good idea to review the
However, when Window functions are applied they do not reduce the rows in the result set. Window describes the set of rows on which the window function operates. It returns a value which is shown in all the rows of the window. One more thing, Window Functions also help us compare the current rows with other rows.
Simple Aggregations
Below is an example showing two operations to give a contrast in the results.
- avg function when a groupBy is applied
- avg function when a window is applied
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
object SparkDataSetWindowFunctions {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark DataSet Window Functions")
.getOrCreate
//Step 2 - Read the products.csv
val ds = spark.read
.option("header",true)
.option("delimiter",",")
.csv("products.csv")
//Step 3 - Get the average quantity sold for each product type.
//This step is only for showing the reduction in the number of rows.
val productTypeDS = ds.groupBy("ProductType")
.agg(avg("Quantity") as "avg_Quantity")
.select("ProductType","avg_Quantity")
productTypeDS.show(10,false)
//Step 4 - Create a Window expression.
val window = Window.partitionBy("ProductType")
//Step 5 - Apply the window on avg aggregation method.
val finalDS = ds.withColumn("avg_Quantity", avg("Quantity").over(window))
//Step 6 - Show the result.
finalDS.orderBy("ProductType","Product").show(20, false)
}
}
Let’s look at the Step – 4 & 5 which are the most important in this example.
- Step 1,2,3 – Creating a spark context, Reading a data file
- Step 3 – Doing a
groupBy average - Step 4 – Create a Window expression- also called the Window Spec – which is telling spark to create sets of rows based on the values of Product Type column.
- Step 5 – Applies the Window Spec over an average aggregation.
Observe that there is no groupBy method call when the window function is applied.
Let’s look at some more interesting examples like applying a rank & dense_rank.
The results from the log file are shown below.


Observe that all the rows are returned and the average quantity (avg_Quantity) is generated for every Product Type(ProductType). Each product type has a same the value for average quantity.
Rank & Dense Rank
Let’s look at another interesting example of ranking the data based on its value in a Window. So, we want to rank the quantity in the
- The rank function assigns ranking within an ordered window. If the values of the two rows are the same, the rank function assigns the same rank, with the next ranking(s) skipped.
- the dense_rank function assigns the ranking within an ordered window, but the ranks are consecutive. In other words, the same ranks are assigned to multiple rows and no ranks are skipped when compared to rank.
Let’s look at an example showing both the functions together.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
object SparkDataSetWindowFunctions {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark DataSet Window Functions")
.getOrCreate
//Step 2 - Read the products.csv
val ds = spark.read
.option("header",true)
.option("delimiter",",")
.csv("products.csv")
//Step 3 - Create a Window expression.
val window = Window.partitionBy("ProductType").orderBy("Quantity")
//Step 4 - Apply the window on rank and dense_rank aggregation method.
val finalDS = ds.withColumn("rankForQty", rank.over(window))
.withColumn("denseRankForQty",dense_rank().over(window))
//Step 5 - Show the result.
finalDS.orderBy("ProductType","Quantity").show(20, false)
}
}
The interesting part of this program is in Step3 & Step4
- Step 1,2 – Create a spark session and read the data file.
- Step 3 – Create a Window expression- also called the Window Spec – which is telling spark to create sets of rows based on the values of Product Type column. In addition to that, we use
orderBy function and ask spark to order the rows based on values in the quantity column. - Step 4 – Applies the Window Spec over an average aggregation.
- Step 5 – Show the result.
The relevant output from the data is shown below

Observe how the ranks are created for rankForQty and denseRankForQty. When the
Cumulative Aggregations
Turning a page let’s now take another use case which comes along a lot. Having cumulative aggregations. A very commonly needed use case is a cumulative or running total. Let’s take look at how we can create a cumulative aggregate.
See the example below with
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
object SparkDataSetWindowFunctions {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark DataSet Window Functions")
.getOrCreate
//Step 2 - Read the products.csv
val ds = spark.read
.option("header",true)
.option("delimiter",",")
.csv("products.csv")
//Step 3 - Create a Window expression.
val window = Window.partitionBy("ProductType")
.orderBy("Quantity")
.rowsBetween(Window.unboundedPreceding,Window.currentRow)
//Step 4 - Apply the window on sum aggregation method.
val finalDS = ds.withColumn("cumulativeQty", sum("Quantity").over(window))
//Step 5 - Show the result.
finalDS.orderBy("ProductType","Quantity").show(20, false)
}
}
Here we introduce a new method
- Start – In a partition, from where does the frame start with respect to the current row. In cumulative sum, the parameter is Window.unboundedPreceding.
- End – In a partition, where does the frame end with respect to the current row. In cumulative sum, the parameter is Window.currentRow
Essentially, we are telling spark to go all the way to the start of the Window and from there look till the current row.
The output of the relevant part of the log is shown below. Observe the
cumulativeQty is running total of Quantity Column

Lead & Lag Functions
The next set of operators which allows us to compare the current row with the next row(s) or the previous row(s) in a Window. This is done by providing an offset to compare. Offset is relative to the current row.
For example, if we want to compare the quantity of the current row with the quantity in the next row we just say lead(“quantity”,1). Similarly, if you want to compare the quantity with the second row after the current row we just say lead(“quantity”,2). The lag function works in exactly the same way but in the other direction.
See below an example showing lead and lag functions in action.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
object SparkDataSetWindowFunctions {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark DataSet Window Functions")
.getOrCreate
//Step 2 - Read the products.csv
val ds = spark.read
.option("header",true)
.option("delimiter",",")
.csv("products.csv")
//Step 3 - Create a Window expression.
val window = Window.partitionBy("ProductType")
.orderBy("Quantity")
//Step 4 - Apply the window on lead & lag method.
val finalDS = ds.withColumn("qtyInNextRow", lead("Quantity",1).over(window))
.withColumn("qtyIn2ndRow", lead("Quantity",2).over(window))
.withColumn("qtyInPrevRow", lag("Quantity",1).over(window))
.withColumn("qtyIn2ndRowsBehindCurrentRow", lag("Quantity",2).over(window))
//Step 5 - Show the result.
finalDS.orderBy("ProductType","Quantity").show(20, false)
}
}
Take a look at the Step-4 closely and it should be self-explanatory by now. In case there is no value found by the lead or the lag
The relevant part of the output is below

First and Last Value
Finally, we come to the last two methods in the Window functions and they give is the first value and the last value of a column in a Window. They are pretty simple to implement. See the code below.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
object SparkDataSetWindowFunctions {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark DataSet Window Functions")
.getOrCreate
//Step 2 - Read the products.csv
val ds = spark.read
.option("header",true)
.option("delimiter",",")
.csv("products.csv")
//Step 3 - Create a Window expression.
val window = Window.partitionBy("ProductType")
.orderBy("Quantity")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
//Step 4 - Apply the window on first and last value method.
val finalDS = ds.withColumn("qtyFirstValue", first("Quantity").over(window))
.withColumn("qtyLastValue", last("Quantity").over(window))
//Step 5 - Show the result.
finalDS.orderBy("ProductType","Quantity").show(20, false)
}
}
Essentially, we are telling spark to start from the first row of the Window and go all the go all the way to the end of the Window while it is looking for the first and the last value
The relevant log is show below

If you are interested in learning more – I advise you to start looking at the spark documentation here. and look for Window functions
A sample of data for this blog is given below. It is a subset of data published by IBM for Watson Analytics
Camping Equipment,Cooking Gear,TrailChef Deluxe Cook Set,2012,Q1 2012,59628.66,489,0.34754797
Camping Equipment,Cooking Gear,TrailChef Double Flame,2012,Q1 2012,35950.32,252,0.4742745
Camping Equipment,Tents,Star Dome,2012,Q1 2012,89940.48,147,0.35277197
Camping Equipment,Tents,Star Gazer 2,2012,Q1 2012,165883.41,303,0.28293788
Camping Equipment,Sleeping Bags,Hibernator Lite,2012,Q1 2012,119822.2,1415,0.29145017
Camping Equipment,Sleeping Bags,Hibernator Extreme,2012,Q1 2012,87728.96,352,0.39814629
Camping Equipment,Sleeping Bags,Hibernator Camp Cot,2012,Q1 2012,41837.46,426,0.33560737
Camping Equipment,Lanterns,Firefly Lite,2012,Q1 2012,8268.41,577,0.52896022
Camping Equipment,Lanterns,Firefly Extreme,2012,Q1 2012,9393.3,189,0.43420523
Camping Equipment,Lanterns,EverGlow Single,2012,Q1 2012,19396.5,579,0.46149254
Camping Equipment,Lanterns,EverGlow Butane,2012,Q1 2012,6940.03,109,0.36186587
Mountaineering Equipment,Rope,Husky Rope 50,2012,Q1 2012,20003.2,133,0.32905585
Mountaineering Equipment,Rope,Husky Rope 60,2012,Q1 2012,14109.4,79,0.29165733
Mountaineering Equipment,Rope,Husky Rope 100,2012,Q1 2012,73970.22,227,0.30126435
Mountaineering Equipment,Rope,Husky Rope 200,2012,Q1 2012,77288.64,143,0.31477575
Mountaineering Equipment,Safety,Granite Climbing Helmet,2012,Q1 2012,62464.88,898,0.24468085
Mountaineering Equipment,Safety,Husky Harness,2012,Q1 2012,34154.9,559,0.28363339
Mountaineering Equipment,Safety,Husky Harness Extreme,2012,Q1 2012,36396.8,352,0.47843327
Mountaineering Equipment,Safety,Granite Signal Mirror,2012,Q1 2012,4074.84,126,0.51422387
Mountaineering Equipment,Climbing Accessories,Granite Carabiner,2012,Q1 2012,15122.72,4022,0.4787234
Mountaineering Equipment,Climbing Accessories,Granite Belay,2012,Q1 2012,19476.8,296,0.47613982
Mountaineering Equipment,Climbing Accessories,Granite Pulley,2012,Q1 2012,15739.22,427,0.50217037
Mountaineering Equipment,Climbing Accessories,Firefly Climbing Lamp,2012,Q1 2012,17998.56,464,0.42768755
Mountaineering Equipment,Climbing Accessories,Firefly Charger,2012,Q1 2012,36494,710,0.56498054
Mountaineering Equipment,Climbing Accessories,Firefly Rechargeable Battery,2012,Q1 2012,11673.6,1520,0.58984375
Mountaineering Equipment,Climbing Accessories,Granite Chalk Bag,2012,Q1 2012,4621.68,262,0.51643991
Mountaineering Equipment,Tools,Granite Ice,2012,Q1 2012,25041.6,333,0.48178191
Mountaineering Equipment,Tools,Granite Hammer,2012,Q1 2012,18118.38,241,0.2434158
Mountaineering Equipment,Tools,Granite Shovel,2012,Q1 2012,9543.16,164,0.33699948
Mountaineering Equipment,Tools,Granite Grip,2012,Q1 2012,10146.2,523,0.49020619
Mountaineering Equipment,Tools,Granite Axe,2012,Q1 2012,32870.4,856,0.49166667
Mountaineering Equipment,Tools,Granite Extreme,2012,Q1 2012,42563.2,566,0.38138298
Personal Accessories,Watches,Mountain Man Extreme,2012,Q1 2012,6499.8,23,0.5888535
Personal Accessories,Eyewear,Polar Sun,2012,Q1 2012,7015.34,116,0.56760471
Personal Accessories,Eyewear,Polar Ice,2012,Q1 2012,3825.8,37,0.51943907
Personal Accessories,Knives,Edge Extreme,2012,Q1 2012,30940.25,275,0.28895209
Personal Accessories,Knives,Bear Survival Edge,2012,Q1 2012,8414.75,97,0.48126801
Personal Accessories,Navigation,Glacier GPS Extreme,2012,Q1 2012,38534.28,114,0.4779303
Outdoor Protection,Insect Repellents,BugShield Extreme,2012,Q1 2012,25010.58,3801,0.63221884
Outdoor Protection,Sunscreen,Sun Shelter Stick,2012,Q1 2012,5718.15,1179,0.59587629
Outdoor Protection,First Aid,Compact Relief Kit,2012,Q1 2012,4057.2,180,0.60070985
Outdoor Protection,First Aid,Aloe Relief,2012,Q1 2012,711.28,136,0.63288719
Hope you have found this blog entry useful.
Till next time…..Byeee!