'write large pyspark dataframe to s3 very slow

This question is relevant to my previous question at aggregate multiple columns in sql table as json or array

I post some updates/follow-up questions here because I got a new problem.

I would like to query a table on presto database from pyspark hive and create a pyspark dataframe based on it. I have to save the dataframe to s3 faster and then read it as parquet (or any other formats as long as it can be read/written fast) from s3 efficiently.

In order to keep the size as small as possible, I have aggregated some columns into a json object.

The original table (> 10^9 rows, some columns (e.g. obj_desc) may have more than 30 English words):

id.    cat_name.   cat_desc.       obj_name.    obj_desc.   obj_num
1.     furniture    living office   desk         4 corners    1.5.      
1      furniture.   living office.  chair.       4 legs.      0.8
1.     furniture.   restroom.       tub.         white wide.  2.7
1.     cloth.       fashion.        T-shirt.     black large. 1.1

I have aggregated some columns to json object.

aggregation_cols = ['cat_name','cat_desc','obj_name','obj_description', 'obj_num'] # they are all string

df_temp = df.withColumn("cat_obj_metadata", F.to_json(F.struct([x for x in aggregation_cols]))).drop(*agg_cols)

df_temp_agg = df_temp.groupBy('id').agg(F.collect_list('cat_obj_metadata').alias('cat_obj_metadata'))

df_temp_agg.cache()
df_temp_agg.printSchema()
# df_temp_agg.count() # this cost a very long time but still cannot return result so I am not sure how large it is.
df_temp_agg.repartition(1024) # not sure what optimal one should be?
df_temp_agg.write.parquet(s3_path, mode='overwrite') # this cost a long time (> 12 hours) but no return.

I work on a m4.4xlarge with 4 nodes and all cores look not busy. I also checked the s3 bucket, no folder created at "s3_path". For other small dataframe, I can see the "s3_path" can be created when "write.parquet()" is run. But, for this large dataframe, nothing fodlers or files are created on "s3_path".

Because the

df_meta_agg.write.parquet()

never returns, I am. not sure what errors could happen here on spark cluster or on s3.

Anybody could help me about this ? thanks



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source