'PySpark : Different Aggregate Alias for each group

Is it possible in spark to do a groupby and aggregate where the alias for the aggregate function is different for each group? For example, if I was doing a groupby and AVG, I want each group to have a different column name such as "group_1_avg" for group 1 and "group_2_avg" for group 2, etc. With the idea that the final result will be a list of columns group_1_avg, group_2_avg, etc.

I realize I can probably not do this and just have everything aggregated under one name and pivot it, but I am trying to avoid pivot due to how expensive it is for my data.

Things I've tried:

frame = frame.groupBy(Item, Group, Level).agg(F.avg(val))

frame = frame.withColumn('Columns', concat(col("Group"), lit(""), col("level"), lit(""), lit("AVG")))

frame = frame.groupBy(Item).pivot(Columns).agg(first(AVG))

This works and does what I need but the problem I have is that the pivot becomes too expensive given the scale of my data so I am looking for an alternate solution.

Thank you for your time.

Input Format

Item Group Level val
W1 A S1 40
W1 A S1 40
W1 A S2 25
W2 A S1 50
W2 A S1 50

Expected Output:

Item A_S1_AVG A_S2_AVG
W1 40 25.0
W2 50 null


Solution 1:[1]

For large dataset, can you envision your data to be in this format ?(instead of constructing 20K columns(!!?), you could have 20K rows)

+----+-----------+----+
|Item|Group_Level|Mean|
+----+-----------+----+
|  W1|       A_S1|40.0|
|  W1|       A_S2|25.0|
|  W2|       A_S1|50.0|
+----+-----------+----+

If so,

from pyspark.sql import functions as F
from pyspark.sql import types as T

df = spark.createDataFrame([('W1',  'A' ,'S1',  40),
('W1',  'A',    'S1',   40),
('W1','A',  'S2',   25),
('W2',  'A',    'S1',   50),
('W2',  'A',    'S1',   50)], ["Item", "Group", "Level", "Val"])


@udf (T.MapType(T.StringType(), T.FloatType()))
def create_group_scores(data):
  data_map = {}
  mean_map = {}
  for datum in data:
    key = f"{datum.Group}_{datum.Level}"
    if key in data_map:
      data_map[key].append(datum.Val)
    else:
      data_map[key] = [datum.Val]
  for key in data_map:
    mean_map[key] = sum(data_map[key])/len(data_map[key])    
  return mean_map

item_groups = df.groupBy("Item").agg(F.collect_list(F.struct("Group", "Level", "Val")).alias("group_level_val")).withColumn("group_scores",create_group_scores("group_level_val"))
item_groups = item_groups.select("Item", F.explode_outer("group_scores").alias("Group_Level", "Mean"))
item_groups.show()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 greenie