'Scala explode followed by UDF on a dataframe fails
I have a scala dataframe with the following schema:
root
|-- time: string (nullable = true)
|-- itemId: string (nullable = true)
|-- itemFeatures: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
I want to explode the itemFeatures column and then send my dataframe to a UDF. But as soon as I include the explode, calling the UDF results in this error:
org.apache.spark.SparkException: Task not serializable
I can't figure out why???
Environment: Scala 2.11.12, Spark 2.4.4
Full example:
val dataList = List(
("time1", "id1", "map1"),
("time2", "id2", "map2"))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode("itemFeatures"))
val doNextThingUDF: UserDefinedFunction = udf(doNextThing _)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time"))
where my UDF looks like this:
val doNextThing(time: String): String = {
time+"blah"
}
If I remove the explode, everything works fine, or if I don't call the UDF after the explode, everything works fine. I could imagine Spark is somehow unable to send each row to a UDF if it is dynamically executing the explode and doesn't know how many rows that are going to exist, but even when I add ex dfExploded.cache() and dfExploded.count() I still get the error. Is this a known issue? What am I missing?
Solution 1:[1]
I think the issue come from how you define your donextThing function. Also
there is couple of typos in your "full example".
Especially the itemFeatures column is a string in your example, I understand it should be a Map. But here is a working example:
val dataList = List(
("time1", "id1", Map("map1" -> 1)),
("time2", "id2", Map("map2" -> 2)))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode($"itemFeatures"))
val doNextThing = (time: String) => {time+"blah"}
val doNextThingUDF = udf(doNextThing)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time")))
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
