'pass accumulators to spark udf

This is a simplified version of what I am trying to do. I want to do some counting inside my udf. So thinking one way of doing it is to pass Long accumulators to the udf and incrementing the acuumulators inside the if else loops in deserializeProtobuf function. But not able to get the syntax working. Can anyone help me with that ? Is there any better way ?

def deserializeProtobuf(raw_data: Byte[Array]) = {

    val input_stream = new ByteArrayInputStream(raw_data)
    parsed_data = CustomClass.parseFrom(input_stream)

    if (condition 1 related to parsed_data) {
        < increment variable1 > 
    } 
    else if (condition 2 related to parsed_data) {
        < increment variable2 > 
    } 
    else {
        < increment variable3 > 
    }
    
}



val decode = udf(deserializeProtobuf _)
      
val deserialized_data = ds.withColumn("data", decode(col("protobufData")))


Solution 1:[1]

I have done something like this before , If you are doing heavy-lifting in your CUSTOMCLASS one thing I can suggest is to Broadcast it , also you can instantiate Metrics on BroadCast variable

Now coming to counting part I tried accumulator part but it was quite difficult to manage them inside UDF as getting correct count over a window so I tried to use spark-metrics and send the count at regular interval

use this https://github.com/groupon/spark-metrics

and make sure initialise the metrics on Broadcast variable creation time from that point the copied variable will report on same metrics

Solution 2:[2]

You shouldn't have to pass the accumulator to the UDF:

import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.sql.functions.{udf,col}

var acc1: LongAccumulator = null
def my_udf = udf ( (arg1: str) => {
     ...
     acc1.add(1)
}
val spark = SparkSession...
acc1      = spark.sparkContext.longAccumulator("acc1")
... withColumn("col_name", my_udf(col("...")))
// some action here to cause the withColumn to execute
System.err.println(s"${acc1.value}")

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Mark Rajcok