In this entry, we look at how we can aggregate data in Spark datasets to create new datasets. If you have seen the previous post on selecting and filtering data you would have realised that these
Let’s start with the count API. There are a couple of ways to apply this API. See Below.
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType, FloatType}
import org.apache.spark.sql.functions.count
object SparkDataSetAggregates {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark DataSet Select")
.getOrCreate
//Step 2 - Create a schema for the file to be read
val ordersSchema = StructType(
List(
StructField("orderkey", IntegerType, true),
StructField("custkey", StringType, true),
StructField("orderstatus", StringType, true),
StructField("totalprice", FloatType, true),
StructField("orderdate", StringType, true),
StructField("order_priority", StringType, true),
StructField("clerk", StringType, true),
StructField("ship_priority", StringType, true),
StructField("comment", StringType, true)
)
)
//Step 3 - Read the CSV file - with Options
val ds = spark.read
.option("header",false)
.option("delimiter","|")
.schema(ordersSchema)
.csv("orders.tbl")
//Step 4 - Select orderkey and status to create a new dataset
val myOrdersDS = ds.select("orderkey", "orderstatus")
//Step 5 - Print the count of rows
println(s"Number of rows is ${myOrdersDS.count}")
//Step 6 - Using groupBy API to count the rows and create a new dataset
val aggDS = myOrdersDS.groupBy("orderstatus").agg(count("orderkey") as "order_count")
aggDS.show()
}
}
Step 5 – Returns the count of rows in a dataset.The relevant log is below

Step 6 – Use the groupBy & agg method to count the orders by order status

Multiple aggregations can be performed via a single groupBy & agg method call. In the next example we see sum, avg, min and max being performed on the order price.
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{count, sum, min, max, avg}
object SparkDataSetAggregates {
def main(args: Array[String]): Unit = {
//Step 1 - Create a spark session
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark DataSet Select")
.getOrCreate
//Step 2 - Create a schema for the file to be read
val ordersSchema = StructType(
List(
StructField("orderkey", IntegerType, true),
StructField("custkey", StringType, true),
StructField("orderstatus", StringType, true),
StructField("totalprice", DoubleType, true),
StructField("orderdate", StringType, true),
StructField("order_priority", StringType, true),
StructField("clerk", StringType, true),
StructField("ship_priority", StringType, true),
StructField("comment", StringType, true)
)
)
//Step 3 - Read the CSV file - with Options
val ds = spark.read
.option("header",false)
.option("delimiter","|")
.schema(ordersSchema)
.csv("orders.tbl")
//Step 4 - Select order key, order status and price to create a new dataset
val myOrdersDS = ds.select("orderkey", "orderstatus","totalprice")
//Step 6 - Using groupBy API to count,min,max,sum the rows and create a new dataset
val aggDS = myOrdersDS.groupBy("orderstatus")
.agg(count("orderkey") as "orderCount", sum("totalprice") as "totalValue",
min("totalprice") as "minPrice",
max("totalprice") as "maxPrice",
avg("totalprice") as "avgPrice"
)
//Step 7 - Show the top 10 rows of the dataset
aggDS.show(10,false)
}
}
The relevant portion of the log is show below

Hope you have found this post interesting.
Till next time ….Byeee!
1 thought on “DataSet Operations – Aggregation – Group Functions”