'Pyspark - Use a dataclass inside map function - can't pickle _abc_data objects
from typing import Optional, List
from dataclasses_json import DataClassJsonMixin
from dataclasses import dataclass, field
from uuid import uuid4 as uuid
from pyspark.sql import SparkSession, Row
from datetime import datetime, date
from pyspark.sql.functions import concat_ws, concat, col, lit
from pyspark.sql.types import StructType,StructField, StringType
import requests, json
#import cloudpickle
#import pyspark.serializers
@dataclass
class example(DataClassJsonMixin):
DataHeaders: Optional[str] = None
DataValues: Optional[str] = None
DateofData: str= datetime.now().isoformat()
spark = SparkSession.builder.getOrCreate()
#pyspark.serializers.cloudpickle = cloudpickle
df_test_row = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df_test_row.show()
cd = example(DataHeaders="a", DataValues="b")
json_req_str = cd.to_json()
print(json_req_str)
def function_odp_req(row):
a = row.__getattr__("a")
b = row.__getattr__("b")
cd = example(DataHeaders=a, DataValues=b)
json_req_str = cd.to_json()
return (1,json_req_str)
df_res = df_test_row.rdd.map((lambda line: function_odp_req(line)))
test = StructType([StructField('id', StringType(), True),StructField('json_request', StringType(), True)])
final_df_json = spark.createDataFrame(data=df_res, schema = test)
final_df_json.printSchema()
final_df_json.show()
The above is the full example. Not able to get the actual json string from the data class when used in side lambda function.
I have referred to multiple posts, but no luck. Any help is appreciated .
When the above gets executed, it generates below response
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
| 4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+
{"DataHeaders": "a", "DataValues": "b", "DateofData": "2021-03-24T15:05:10.845998"}
root
|-- id: string (nullable = true)
|-- json_request: string (nullable = true)
+---+------------+
| id|json_request|
+---+------------+
| 1| {}|
| 1| {}|
| 1| {}|
+---+------------+
If you look at the above sample output of dataframe, json_request is coming as empty. When the same is used out side of map function, json string got generated.
when I do not enable the "cloudpickle" , I see the below exception
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-4696972411027040040.py", line 468, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 49, in <module>
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 749, in createDataFrame
jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2298, in _to_java_object_rdd
return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2532, in _jrdd
self._jrdd_deserializer, profiler)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2434, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 600, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _abc_data objects
So it is basically the "example" object used in side the function is not getting serialized, is what I understood.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
