'How to distribute map(...) operation over cluster?
I am running a distributed operation in Databricks 10.2, Spark 3.2.0, Python 3.8 which queries an underlying Delta table.
After querying, grouping by a column and collecting the rows within each group, I need to perform a complex algorithm on each group's elemenets and save the results.
This complex algorithm is written in Python code and is triggered for each group through the rdd.map() operation:
raw_data_groups_df = spark \
.read \
.format('delta') \
.load(f"dbfs:/mnt/the-table-path/...") \
.select('col1', 'col2','col3') \
.filter(f.col('ts').between(from_date, to_date)) \
.groupBy('col1') \
.agg(f.collect_set(f.struct('col2', 'col3'))) \
.limit(2000) # For testing on small dataset....
def run_algorithm_on_one_group(col1: str, list_of_col2_col3_in_group):
# Perform the amazing algorithm calling various specialist python code & packages
# !!! this mapping should be parallelized but is actually sequential!!
results_rdd = raw_data_groups_df.rdd.map(lambda group: run_algorithm_on_one_group(group[0], group[1]))
output_schema = StructType(.....)
output_df = spark.createDataFrame(results_rdd , output_schema)
output_df \
.write.format("csv") \
.mode("overwrite") \
.save("dbfs:/mnt/path-to-the-output-the-data.csv")
The problem, I discovered that the map(...) operation is not parallelized, as proven from external logs generated which show that each element processed starts strictly after the previous one completed. Hence the cluster is only getting utilized for the initial query, and not for the processing (which is the more important part to distribute as takes much longer than the querying!)
How can I ensure the map operation calling the custom algorithm is distributed as well?
If it cannot be distributed to the worker nodes in the cluster, can I at least process it using python's multiprocessing library to distribute across cores on the main node?
Or - can I manually create Spark jobs for each grouping so the jobs get distributed over the cluster?
Or - can a UDF wrap the algorithm, bearing in mind that it imports external files & packages and is quote complex?
Solution 1:[1]
I never found out why the rdd.map() was not distributed across nodes (I understand in general it can be... but logs & timestamps proved otherwise in this particular case).
However I did manage to decorate my function as a udf and set up the output structures, then use withColumn() to attach the algorithm results to a new column, which I proved from logs was fully distributed:
raw_data_groups_df = spark \
.read \
.format('delta') \
.load(f"dbfs:/mnt/the-table-path/...") \
.select('col1', 'col2','col3') \
.filter(f.col('ts').between(from_date, to_date)) \
.groupBy('col1') \
.agg(f.collect_set(f.struct('col2', 'col3')).alias('algo_input_data')) \
.limit(2000) # For testing on small dataset....
output_schema = StructType(.....)
@udf(returnType=ArrayType(output_schema))
def run_algorithm_on_one_group(col1: str, list_of_col2_col3_in_group):
# Perform the amazing algorithm calling various specialist python code & packages
# Special care taken to set environment path variables so that local imports work
return [(val1,val2,...) ... ]
with_results_df = raw_data_groups_df.withColumn('algo_results', run_algorithm_on_one_group(f.col('col1'), f.col('algo_input_data')))
with_results_df \
.select(f.explode('algo_results')) \
.select('algo_results.*') \
.write.format("csv") \
.mode("overwrite") \
.save("dbfs:/mnt/path-to-the-output-the-data.csv")
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
