'Spark - tracking schema changes through transformations
I'm working on a code-base that groups together chunks of logic into classes. For example, there might be a class that calculates the mean of a column after filtering, and another that aggregates columns to calculate the unique_count of certain fields. Sample code below:
// note: this interface is mutable and we can add things to this interface as needed
trait Processor {
def dosomething(input: DataFrame): DataFrame
}
class NormScore extends Processor {
def dosomething(input: DataFrame) = {
input.withColumn("normalized_score", col("score") - lit(1))
}
}
class Distribution extends Processor {
def dosomething(input: DataFrame) = {
input.groupBy(col("score"))
.agg(count(lit(1)).as("count"))
}
}
At the end, we chain these together to form a pipeline of transformations:
val pipeline = Seq(Distribution(), NormScore())
And we sequentially enact this set of transformations on a given input.
Problem: We know the schema for the input, and I would like to support a simple def validate(pipeline: Seq[Processor]) method to validate if a pipeline can even work or not. For example, if you're aggregating on a field that doesn't exist, spark will complain and throw an error. However, since we know all the types involved, it feels like we should be able to figure this out before spark even starts. Today we catch this in a unit-test where we run through with some mock-data, but that can take > 1m depending on the complexity of the transform, and it requires generating data, etc. Instead I'd like to see if I can deduce the transforms myself without needing spark.
This boils down to keeping track of all schema changes. Each Processor is just: (a) consuming records based on the input schema - it's a valid use of the processor if the input schema has the necessary columns that the processor uses (b) generating records with the output schema, which is then fed into the next Processor as an input.
What I've tried: I've tried mapping columns back to the spark expressions, and traversing the catalyst expression-tree. However, this doesn't work well for .withColumn type of additions. I've also got no clue if I'm working with the catalyst expression tree correctly, and how I would track (a) what input fields a transformation uses (I figure I can leverage expr.dataType to figure out what's generated).
Any suggestions?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
