'Scala explode followed by UDF on a dataframe fails

I have a scala dataframe with the following schema:

root
 |-- time: string (nullable = true)
 |-- itemId: string (nullable = true)
 |-- itemFeatures: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

I want to explode the itemFeatures column and then send my dataframe to a UDF. But as soon as I include the explode, calling the UDF results in this error: org.apache.spark.SparkException: Task not serializable

I can't figure out why???

Environment: Scala 2.11.12, Spark 2.4.4

Full example:

val dataList = List(
    ("time1", "id1", "map1"),
    ("time2", "id2", "map2"))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode("itemFeatures"))

val doNextThingUDF: UserDefinedFunction = udf(doNextThing _)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time"))

where my UDF looks like this:

val doNextThing(time: String): String = {
  time+"blah"
}

If I remove the explode, everything works fine, or if I don't call the UDF after the explode, everything works fine. I could imagine Spark is somehow unable to send each row to a UDF if it is dynamically executing the explode and doesn't know how many rows that are going to exist, but even when I add ex dfExploded.cache() and dfExploded.count() I still get the error. Is this a known issue? What am I missing?



Solution 1:[1]

I think the issue come from how you define your donextThing function. Also there is couple of typos in your "full example".

Especially the itemFeatures column is a string in your example, I understand it should be a Map. But here is a working example:

val dataList = List(
    ("time1", "id1", Map("map1" -> 1)),
    ("time2", "id2", Map("map2" -> 2)))

val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode($"itemFeatures"))

val doNextThing = (time: String) => {time+"blah"}
val doNextThingUDF = udf(doNextThing)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time")))

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1