'Pyspark - Use a dataclass inside map function - can't pickle _abc_data objects

from typing import Optional, List
from dataclasses_json import DataClassJsonMixin
from dataclasses import dataclass, field
from uuid import uuid4 as uuid
from pyspark.sql import SparkSession, Row
from datetime import datetime, date
from pyspark.sql.functions import concat_ws, concat, col, lit
from pyspark.sql.types import StructType,StructField, StringType
import requests, json
#import cloudpickle
#import pyspark.serializers


@dataclass
class example(DataClassJsonMixin):
    DataHeaders: Optional[str] = None
    DataValues: Optional[str] = None
    DateofData: str= datetime.now().isoformat()

spark = SparkSession.builder.getOrCreate()

#pyspark.serializers.cloudpickle = cloudpickle

df_test_row = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df_test_row.show()

cd = example(DataHeaders="a", DataValues="b")
json_req_str = cd.to_json()

print(json_req_str)

def function_odp_req(row):
    a = row.__getattr__("a")
    b = row.__getattr__("b")
    cd = example(DataHeaders=a, DataValues=b)
    json_req_str = cd.to_json()
    return (1,json_req_str)

df_res = df_test_row.rdd.map((lambda line: function_odp_req(line)))
test = StructType([StructField('id', StringType(), True),StructField('json_request', StringType(), True)])

final_df_json = spark.createDataFrame(data=df_res, schema = test)
final_df_json.printSchema()
final_df_json.show()

The above is the full example. Not able to get the actual json string from the data class when used in side lambda function.

I have referred to multiple posts, but no luck. Any help is appreciated .

When the above gets executed, it generates below response

|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

{"DataHeaders": "a", "DataValues": "b", "DateofData": "2021-03-24T15:05:10.845998"}
root
 |-- id: string (nullable = true)
 |-- json_request: string (nullable = true)

+---+------------+
| id|json_request|
+---+------------+
|  1|          {}|
|  1|          {}|
|  1|          {}|
+---+------------+

If you look at the above sample output of dataframe, json_request is coming as empty. When the same is used out side of map function, json string got generated.

when I do not enable the "cloudpickle" , I see the below exception

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-4696972411027040040.py", line 468, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 49, in <module>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 749, in createDataFrame
    jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2298, in _to_java_object_rdd
    return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2532, in _jrdd
    self._jrdd_deserializer, profiler)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2434, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 600, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _abc_data objects

So it is basically the "example" object used in side the function is not getting serialized, is what I understood.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source