Scaling Data Engineering using Scala Implicits and Ad-hoc Polymorphism with Apache Spark
Often times we see ourselves chaining
.read(), .join(), .groupBy() and .agg()for machine learning, recommendations or predictions. I have spent countless hours writing, testing and ultimately debugging these lines of codes to understand which line caused a bad transformation. Not to mention, they are one-off implementations with no reusability. Our goal is to improve these chained implementations by something more reusable and scalable.
In this blog, we will discuss how to modularize our spark application using scala’s
implicit keyword and
Our goal is to implement a job first, in a brute-force spark transformation script and then improve it using ad-hoc polymorphism. We will use a farmBot IoT project data as an example.
First let’s write our model classes:
We have 5 case classes:
HeatSensorEvent:A case class for capturing heat sensor data.
DeviceHeartbeat:An event emitted by the device to determine the health of device.
HeatSensorVector: An aggregation of
HeatSensorEventdataset using custom transformation and aggregation.
DeviceHeatSensorVector: Output of join between
FarmBotActivity:A general farmBot’s activity dataset that will be used in conjunction with
How predictions and models are built is out of scope for this article. We will focus on how to streamline our data transformation operations instead.
Let’s generate some fake data for our use:
Now that the model classes and data is ready, we define our ops to:
case class HeatSensorVector(sensorId: Id, deviceId: Id, temperatureVector: Vector[Double])
- Repeat step 1 to build an aggregate dataset of
Dataset[DeviceHeartBeatVector], Device[FarmBotActivity]for predicting our bot’s resource requirements.
Let’s first implement our required transformation using basic spark dataset api’s.
Brute Force Implementation
For a single use case, this maybe the best way to go. No hassle, no extra implementation. Probably most engineers would also like to see the same.
Now, let’s imagine for another project, we need to use
Dataset[HeatSensorEvent] data to aggregate and join to extend our analysis with the third dataset,
Oops!!!!! Bummer. We have 2 options now:
- We either move our spark app into a function and re-use it at two different places OR
- Copy-paste the code to read the datasets again for our new transformation.
Second option is straight up against DRY principle. But even the first option is hard and impossible to maintain as now we cannot test the function in isolation as we are using
sparkSession to read and generate dataset. This is where scala’s Ad-hoc Polymorphism comes to rescue. Keep reading…
Optimizing Our Snippet using Ad-hoc polymorphism
There are excellent articles on what is ad-hoc polymorphism and how Scala supports it. Here’s an article that simplifies and explains this concept for newbies: https://dzone.com/articles/scala-ad-hoc-polymorphism-explained.
In short, ad-hoc polymorphism is:
With generic, we specify a generic code to run on various types, and with ad hoc polymorphism, we can change the generic code run based on the actual type polymorphed on!
That being said, now we need to think about how can we improve our chained functional paradigm into a more re-usable well tested application? Before we dive into optimizing our code, lets list out the api’s we are using:
Implementing IoTReader[T] to Read Datasets
IoTReader[T] is a
trait that exposes:
def read(paths: Any): Dataset[T]
On line 12, 23 and 36, we are implementing
implicit object that extend
IoTReader[T]. With these objects, Scala can during runtime match and determine which dataset to read. So our
read() api’s can now be called simply with:
val heatSensorEventDs = IoTReader.read[HeatSensorEvent](paths)
val deviceHeartbeatDs = IoTReader.read[DeviceHeartbeat](paths)
val farmBotActivityDs = IoTReader.read[FarmBotActivity](paths)
Implementing IoTJoiner[T, U, V] instead of join
Similarly, an IoTJoiner is a
trait that exposes:
def join(left: Dataset[T], right: Dataset[U])(implicit sparkSession: SparkSession): Dataset[V]
Just like our
IoTReader , we can extend
IoTJoiner[T, U, V] to implement
DeviceHeatSensorVectorIoTJoiner, our joins will them become:
val deviceHeatSensorVectorDs = IoTJoiner.join(deviceHeartbeatDs, heatSensorVectorDs)
Implementing IoTAggregator[T, U] instead of groupBy and agg
Our third commonly used operation of aggregation can similarly be converted into a
trait that exposes:
def aggregate(in: Dataset[T])(implicit sparkSession: SparkSession): Dataset[U]
IoTJoiner implicit objects, we have created an implicit object called
HeatSensorEventAggregator which can be used as:
val heatSensorVectorDs = IoTAggregator.aggregate(heatSensorEventDs)
Rewriting our Brute Force Implementation
The same snippet that took at least around 30 lines can now be crunched to merely 6 lines:
Great Job!!!! Now use can reuse your readers, joiners and aggregators in any project as a library and reduce the pain or rewriting heavy spark transformations!
Do you need these implicit ad-hoc polymorphic factories?
It depends. I believe there are 2 cases where teams can vastly benefit from such implementations:
- Data governance, privacy and retention policies play a big role in usage of data: Here, a single team can keep a tight control on these api’s and instead of exposing the warehouse directly, they can instead maintain these API’s. Of course there is overhead of maintaining them, but at the hindsight, we are reducing 10 times possible code duplication across multiple teams.
- When schemas of datasets are too complex: Users can vastly benefit from well-documented API’s as they are much more readable and easy to use. A machine learning or data scientist’s primary concern is to build, train and test models. He/She doesn’t need to investigate and learn how to prepare or fetch data.
Last but not the least, every single line in the
OptimalCodeWithIoTFactories.scala snippet can be individually tested and relied upon with standardized versioning and artifacts before publishing!!!!
Let me know what do you think about these implementations. All comments are welcome! Thanks for reading.