'pyspark foreach does not parallelize when applying to a Dataframe that was aggregated (but does on a normal Dataframe)

I am using pyspark.sql.dataframe.DataFrame in pyspark. I have one driver and 3 executors/workers.

When I want to apply a function to each row and have it run on one of the 3 executors it works on a normal dataframe but if I have done a groupBy and agg to the dataframe then it all goes on the same executor/worker.

data = [('James','Smith','apples','a'),('James','Smith','oranges','b'),('James','Smith','lemons','a'),('Anna','Rose','apples','a'),('Anna','Rose','lemons','b'), ('Robert','Williams','oranges','v'), ]
columns = ["firstname","lastname","fuits","letter"]

df = spark.createDataFrame(data=data, schema = columns)
df.show()

+---------+--------+-------+------+
|firstname|lastname|  fuits|letter|
+---------+--------+-------+------+
|    James|   Smith| apples|     a|
|    James|   Smith|oranges|     b|
|    James|   Smith| lemons|     a|
|     Anna|    Rose| apples|     a|
|     Anna|    Rose| lemons|     b|
|   Robert|Williams|oranges|     v|
+---------+--------+-------+------+

dfagg = df.groupBy("firstname","lastname").agg(functions.collect_list("fuits"), functions.collect_list("letter"))

dfagg.show()

+---------+--------+--------------------+--------------------+
|firstname|lastname| collect_list(fuits)|collect_list(letter)|
+---------+--------+--------------------+--------------------+
|    James|   Smith|[apples, oranges,...|           [a, b, a]|
|     Anna|    Rose|    [lemons, apples]|              [b, a]|
|   Robert|Williams|           [oranges]|                 [v]|
+---------+--------+--------------------+--------------------+

I then apply the foreach with a simple function :

# Foreach example
def f(x): 
    print(x)
    print(' ===== > this is in the simple foreach')
        
def f2(x): 
    print(x)
    print(' ===== > this is in the aggregated foreach')

# foreach applied to the normal dataframe
df.foreach(f)

# foreach applied to the dataframe that was aggregated
dfagg.foreach(f2)

  • For the normal dataframe and foreach I get the expected outcome:
    The prints of the 6 rows are distributed on the 3 executors/workers

On the executor 1

Row(firstname='James', lastname='Smith', fuits='lemons', letter='a')
 ===== > this is in the simple foreach
Row(firstname='Anna', lastname='Rose', fuits='apples', letter='a')
 ===== > this is in the simple foreach

On the executor 2

Row(firstname='Anna', lastname='Rose', fuits='lemons', letter='b')
 ===== > this is in the simple foreach
Row(firstname='Robert', lastname='Williams', fuits='oranges', letter='v')
 ===== > this is in the simple foreach

On the executor 3

Row(firstname='James', lastname='Smith', fuits='apples', letter='a')
 ===== > this is in the simple foreach
Row(firstname='James', lastname='Smith', fuits='oranges', letter='b')
 ===== > this is in the simple foreach

  • But for the foreach on the aggregated dataframe:
    Everything goes to the same executor/worker
Row(firstname='James', lastname='Smith', collect_list(fuits)=['lemons', 'apples', 'oranges'], collect_list(letter)=['a', 'a', 'b'])
 ===== > this is in the aggregated foreach
Row(firstname='Anna', lastname='Rose', collect_list(fuits)=['apples', 'lemons'], collect_list(letter)=['a', 'b'])
 ===== > this is in the aggregated foreach
Row(firstname='Robert', lastname='Williams', collect_list(fuits)=['oranges'], collect_list(letter)=['v'])
 ===== > this is in the aggregated foreach

How can I distribute the function on each row to the 3 executors ?

I need to work on an aggregated dataframe and performe quite long functions on each aggregated row, so it needs to be distributed otherwise it takes too long.

I have tried with more data to see if there was a min amount of data -> no change

Both dataframes are of the same type

print( 'Type of df : ', type(df) )
print( 'Type of dfagg : ', type(dfagg) )

Type of df :  <class 'pyspark.sql.dataframe.DataFrame'>
Type of dfagg :  <class 'pyspark.sql.dataframe.DataFrame'>

Thank you very much



Solution 1:[1]

Sounds like you have data skew. I can't think of why else all data is ending up on one node.

If you are using a recent version spark 3.0.(You could use adaptive query to help automagically adjust your shuffle partitions.)

If you aren't so luck (to be on spark 3.0) you could try distribute by, set an appropriate shuffle partitions value or repartition. This should redistribute the data hopefully in a way that enables you to use all of your cluster.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Matt Andruff