Scaling Data Engineering using Scala Implicits and Ad-hoc Polymorphism with Apache Spark

Omkar Kulkarni
4 min readApr 28, 2021

--

Often times we see ourselves chaining .read(), .join(), .groupBy() and .agg()for machine learning, recommendations or predictions. I have spent countless hours writing, testing and ultimately debugging these lines of codes to understand which line caused a bad transformation. Not to mention, they are one-off implementations with no reusability. Our goal is to improve these chained implementations by something more reusable and scalable.

Say thanks to Dall-E for the image :)

In this blog, we will discuss how to modularize our spark application using scala’s implicit keyword and ad-hoc polymorphism.

Goal

Our goal is to implement a job first, in a brute-force spark transformation script and then improve it using ad-hoc polymorphism. We will use a farmBot IoT project data as an example.

First let’s write our model classes:

Model Classes

We have 5 case classes:

  1. HeatSensorEvent: A case class for capturing heat sensor data.
  2. DeviceHeartbeat: An event emitted by the device to determine the health of device.
  3. HeatSensorVector : An aggregation of HeatSensorEvent dataset using custom transformation and aggregation.
  4. DeviceHeatSensorVector : Output of join between HeatSensorVector and DeviceHeartbeat
  5. FarmBotActivity: A general farmBot’s activity dataset that will be used in conjunction with DeviceHeatSensorVector for predictions.

How predictions and models are built is out of scope for this article. We will focus on how to streamline our data transformation operations instead.

Let’s generate some fake data for our use:

Fake Data For our Example

Now that the model classes and data is ready, we define our ops to:

  1. Aggregate HeatSensorEvent stream into:
    case class HeatSensorVector(sensorId: Id, deviceId: Id, temperatureVector: Vector[Double])
  2. Join Dataset[DeviceHeartBeat]with Dataset[HeatSensorEvent] to construct Dataset[DeviceHeartBeatVector]
  3. Repeat step 1 to build an aggregate dataset of Dataset[DeviceHeartBeatVector], Device[FarmBotActivity] for predicting our bot’s resource requirements.

Let’s first implement our required transformation using basic spark dataset api’s.

Brute Force Implementation

Brute Force Implementation of our Transformations using Spark’s Scala APIs

For a single use case, this maybe the best way to go. No hassle, no extra implementation. Probably most engineers would also like to see the same.

Now, let’s imagine for another project, we need to use Dataset[DeviceHeartBeat] and Dataset[HeatSensorEvent] data to aggregate and join to extend our analysis with the third dataset, Dataset[FarmBotActivity].

Oops!!!!! Bummer. We have 2 options now:

  1. We either move our spark app into a function and re-use it at two different places OR
  2. Copy-paste the code to read the datasets again for our new transformation.

Second option is straight up against DRY principle. But even the first option is hard and impossible to maintain as now we cannot test the function in isolation as we are using sparkSession to read and generate dataset. This is where scala’s Ad-hoc Polymorphism comes to rescue. Keep reading…

Optimizing Our Snippet using Ad-hoc polymorphism

There are excellent articles on what is ad-hoc polymorphism and how Scala supports it. Here’s an article that simplifies and explains this concept for newbies: https://dzone.com/articles/scala-ad-hoc-polymorphism-explained.

In short, ad-hoc polymorphism is:

With generic, we specify a generic code to run on various types, and with ad hoc polymorphism, we can change the generic code run based on the actual type polymorphed on!

That being said, now we need to think about how can we improve our chained functional paradigm into a more re-usable well tested application? Before we dive into optimizing our code, lets list out the api’s we are using:

  1. .read()
  2. .groupByKey()/groupBy()
  3. .agg()
  4. .select()

Implementing IoTReader[T] to Read Datasets

An IoTReader[T] is a trait that exposes:

def read(paths: Any): Dataset[T]

On line 12, 23 and 36, we are implementing implicit object that extend IoTReader[T]. With these objects, Scala can during runtime match and determine which dataset to read. So our read() api’s can now be called simply with:

val heatSensorEventDs = IoTReader.read[HeatSensorEvent](paths)
val deviceHeartbeatDs = IoTReader.read[DeviceHeartbeat](paths)
val farmBotActivityDs = IoTReader.read[FarmBotActivity](paths)

Implementing IoTJoiner[T, U, V] instead of join

Similarly, an IoTJoiner is a trait that exposes:

def join(left: Dataset[T], right: Dataset[U])(implicit sparkSession: SparkSession): Dataset[V]

Just like our IoTReader , we can extend IoTJoiner[T, U, V] to implement DeviceHeatSensorVectorIoTJoiner.

Using our DeviceHeatSensorVectorIoTJoiner, our joins will them become:

val deviceHeatSensorVectorDs = IoTJoiner.join(deviceHeartbeatDs, heatSensorVectorDs)

Implementing IoTAggregator[T, U] instead of groupBy and agg

Our third commonly used operation of aggregation can similarly be converted into a trait that exposes:

def aggregate(in: Dataset[T])(implicit sparkSession: SparkSession): Dataset[U]
NOTE: For extra type safety, note that I have implemented the Aggregator interface of Apache spark on line 8. If type-safety is not a concern, you can replace it by collect_set/list.

Similar to IoTReader and IoTJoiner implicit objects, we have created an implicit object called HeatSensorEventAggregator which can be used as:

val heatSensorVectorDs = IoTAggregator.aggregate(heatSensorEventDs)

Rewriting our Brute Force Implementation

The same snippet that took at least around 30 lines can now be crunched to merely 6 lines:

Great Job!!!! Now use can reuse your readers, joiners and aggregators in any project as a library and reduce the pain or rewriting heavy spark transformations!

Do you need these implicit ad-hoc polymorphic factories?

It depends. I believe there are 2 cases where teams can vastly benefit from such implementations:

  1. Data governance, privacy and retention policies play a big role in usage of data: Here, a single team can keep a tight control on these api’s and instead of exposing the warehouse directly, they can instead maintain these API’s. Of course there is overhead of maintaining them, but at the hindsight, we are reducing 10 times possible code duplication across multiple teams.
  2. When schemas of datasets are too complex: Users can vastly benefit from well-documented API’s as they are much more readable and easy to use. A machine learning or data scientist’s primary concern is to build, train and test models. He/She doesn’t need to investigate and learn how to prepare or fetch data.

Last but not the least, every single line in the OptimalCodeWithIoTFactories.scala snippet can be individually tested and relied upon with standardized versioning and artifacts before publishing!!!!

Let me know what do you think about these implementations. All comments are welcome! Thanks for reading.

--

--

Responses (2)