'Spark NotSerializableException when overriding log4j logs

I have overriden databricks log4j logs using init script. When my code is trigged it is running fine till some point. when it reaches the below line:

val ds = df.as[MySDMData]

ds.map(a => func1(a)).write.format("delta").mode("overwrite").option("header","true").save(s"${Interimpath}/sdm_outer_java")

it fails with the following stacktrace:

Caused by: Job aborted due to stage failure.
Caused by: NotSerializableException: org.apache.log4j.Logger
Serialization stack:
    - object not serializable (class: org.apache.log4j.Logger, value: org.apache.log4j.Logger@33280b3f)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw$$iw, name: logger, type: class org.apache.log4j.Logger)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw$$iw@78706dad)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw$$iw)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw@259eefbe)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw$$iw)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw@58203e28)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw$$iw)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw@621d2f16)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw$$iw)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw@49da2284)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw$$iw)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw, $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw@4f8ac0c9)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364029.$read, name: $iw, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read$$iw)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364029.$read, $linef43f9ceebbd54e07ba09b7bf5984364029.$read@3fc7ede0)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $linef43f9ceebbd54e07ba09b7bf5984364029$read, type: class $linef43f9ceebbd54e07ba09b7bf5984364029.$read)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@375936db)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $outer, type: class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@750dbdc3)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $outer, type: class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@4c9f0247)
    - field (class: $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $outer, type: class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
    - object (class $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $linef43f9ceebbd54e07ba09b7bf5984364041.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@e81047)
    - element of array (index: 4)
    - array (class [Ljava.lang.Object;, size 7)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$6664/133918073, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$6664/133918073@36b47765)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted:(Lscala/Function2;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.rdd.RDD$$Lambda$6661/1112983908, org.apache.spark.rdd.RDD$$Lambda$6661/1112983908@63563080)
    - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[4613] at execute at DeltaInvariantCheckerExec.scala:85)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (MapPartitionsRDD[4613] at execute at DeltaInvariantCheckerExec.scala:85,org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$7277/44213980@6853ff1b))

This is my case class

case class MyData(var case_id: String,
                  var mbr_facet_id: String,
                  var mbr_id: String,
                  ....
                  var tpc_chg: String,
                  var icue_evi_flg: String) 

I am mapping my case class with other dataframe. As shown below

val ds = df.as[MyData]

ds.map(a => func1(a)).write.mode("overwrite").option("header","true").parquet(s"${Interimpath}/cdf_cpm_interim3") 

when it comes to this point I am getting this error.

Is this error because of map function. How do I solve this?

New Edit

we have a dataframe df, and MySDMData is a case class having some parameters.

Using this I am making data type to same in both.

val ds = df.as[MySDMData]

here ds is a dataset

then doing below

ds.map(a => func1(a)).write.format("delta").mode("overwrite").option("header", "true").save(s"${Interimpath}/sdm_outer_java")

where func1 is method accepecting dataset(ds) as parameter doing some logical operation and returning

 def func1(ds: MySDMData): MySDMData = {
            
        /*logical operation*/
 
  val obj = MySDMData(ds.case_id,ds.mbr_facet_id,ds.mbr_id,....)
    obj //return
  }



Solution 1:[1]

Are you adding an instance of org.apache.log4j.Logger to one of your class/case classes? see the below lines in your stacktrace:

Caused by: NotSerializableException: org.apache.log4j.Logger // this type is un-serializable
----
field (class: ..., name: logger, type: class org.apache.log4j.Logger)
                           ^                                   ^

It is trying to serialize the logger, if so, don't do that, logger is not something to serialize and send it somewhere else, loggers belong to a specific scope of your code (they are meant to be specific only to where they're used).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 AminMal