DataSet Operations – Aggregation – Group Functions

In this entry, we look at how we can aggregate data in Spark datasets to create new datasets. If you have seen the previous post on selecting and filtering data you would have realised that these API are quite similar to SQL operations. That similarity continues in this entry as well.

Let’s start with the count API. There are a couple of ways to apply this API. See Below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType, FloatType}
import org.apache.spark.sql.functions.count

object SparkDataSetAggregates {
  def main(args: Array[String]): Unit = {

    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark DataSet Select")
      .getOrCreate

    //Step 2 - Create a schema for the file to be read
    val ordersSchema = StructType(
      List(
        StructField("orderkey", IntegerType, true),
        StructField("custkey", StringType, true),
        StructField("orderstatus", StringType, true),
        StructField("totalprice", FloatType, true),
        StructField("orderdate", StringType, true),
        StructField("order_priority", StringType, true),
        StructField("clerk", StringType, true),
        StructField("ship_priority", StringType, true),
        StructField("comment", StringType, true)
      )
    )

    //Step 3 - Read the CSV file - with Options
    val ds = spark.read
      .option("header",false)
      .option("delimiter","|")
      .schema(ordersSchema)
      .csv("orders.tbl")

    //Step 4 - Select orderkey and status to create a new dataset
    val myOrdersDS = ds.select("orderkey", "orderstatus")

    //Step 5 - Print the count of rows
    println(s"Number of rows is ${myOrdersDS.count}")

    //Step 6 - Using groupBy API to count the rows and create a new dataset
    val aggDS = myOrdersDS.groupBy("orderstatus").agg(count("orderkey") as "order_count")

    aggDS.show()
  }
}

Step 5 – Returns the count of rows in a dataset.The relevant log is below

Step 6 – Use the groupBy & agg method to count the orders by order status.The relevant log is below

Multiple aggregations can be performed via a single groupBy & agg method call. In the next example we see sum, avg, min and max being performed on the order price.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{count, sum, min, max, avg}

object SparkDataSetAggregates {
  def main(args: Array[String]): Unit = {

    //Step 1 - Create a spark session
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark DataSet Select")
      .getOrCreate

    //Step 2 - Create a schema for the file to be read
    val ordersSchema = StructType(
      List(
        StructField("orderkey", IntegerType, true),
        StructField("custkey", StringType, true),
        StructField("orderstatus", StringType, true),
        StructField("totalprice", DoubleType, true),
        StructField("orderdate", StringType, true),
        StructField("order_priority", StringType, true),
        StructField("clerk", StringType, true),
        StructField("ship_priority", StringType, true),
        StructField("comment", StringType, true)
      )
    )

    //Step 3 - Read the CSV file - with Options
    val ds = spark.read
      .option("header",false)
      .option("delimiter","|")
      .schema(ordersSchema)
      .csv("orders.tbl")

    //Step 4 - Select order key, order status and price to create a new dataset
    val myOrdersDS = ds.select("orderkey", "orderstatus","totalprice")

    //Step 6 - Using groupBy API to count,min,max,sum the rows and create a new dataset
    val aggDS = myOrdersDS.groupBy("orderstatus")
      .agg(count("orderkey") as "orderCount", sum("totalprice") as "totalValue",
        min("totalprice") as "minPrice",
        max("totalprice") as "maxPrice",
        avg("totalprice") as "avgPrice"
      )

    //Step 7 - Show the top 10 rows of the dataset
    aggDS.show(10,false)
  }
}

The relevant portion of the log is show below

Hope you have found this post interesting.

Till next time ….Byeee!

1 thought on “DataSet Operations – Aggregation – Group Functions”

Leave a Comment