'PySpark : Different Aggregate Alias for each group
Is it possible in spark to do a groupby and aggregate where the alias for the aggregate function is different for each group? For example, if I was doing a groupby and AVG, I want each group to have a different column name such as "group_1_avg" for group 1 and "group_2_avg" for group 2, etc. With the idea that the final result will be a list of columns group_1_avg, group_2_avg, etc.
I realize I can probably not do this and just have everything aggregated under one name and pivot it, but I am trying to avoid pivot due to how expensive it is for my data.
Things I've tried:
frame = frame.groupBy(Item, Group, Level).agg(F.avg(val))
frame = frame.withColumn('Columns', concat(col("Group"), lit(""), col("level"), lit(""), lit("AVG")))
frame = frame.groupBy(Item).pivot(Columns).agg(first(AVG))
This works and does what I need but the problem I have is that the pivot becomes too expensive given the scale of my data so I am looking for an alternate solution.
Thank you for your time.
Input Format
| Item | Group | Level | val |
|---|---|---|---|
| W1 | A | S1 | 40 |
| W1 | A | S1 | 40 |
| W1 | A | S2 | 25 |
| W2 | A | S1 | 50 |
| W2 | A | S1 | 50 |
Expected Output:
| Item | A_S1_AVG | A_S2_AVG |
|---|---|---|
| W1 | 40 | 25.0 |
| W2 | 50 | null |
Solution 1:[1]
For large dataset, can you envision your data to be in this format ?(instead of constructing 20K columns(!!?), you could have 20K rows)
+----+-----------+----+
|Item|Group_Level|Mean|
+----+-----------+----+
| W1| A_S1|40.0|
| W1| A_S2|25.0|
| W2| A_S1|50.0|
+----+-----------+----+
If so,
from pyspark.sql import functions as F
from pyspark.sql import types as T
df = spark.createDataFrame([('W1', 'A' ,'S1', 40),
('W1', 'A', 'S1', 40),
('W1','A', 'S2', 25),
('W2', 'A', 'S1', 50),
('W2', 'A', 'S1', 50)], ["Item", "Group", "Level", "Val"])
@udf (T.MapType(T.StringType(), T.FloatType()))
def create_group_scores(data):
data_map = {}
mean_map = {}
for datum in data:
key = f"{datum.Group}_{datum.Level}"
if key in data_map:
data_map[key].append(datum.Val)
else:
data_map[key] = [datum.Val]
for key in data_map:
mean_map[key] = sum(data_map[key])/len(data_map[key])
return mean_map
item_groups = df.groupBy("Item").agg(F.collect_list(F.struct("Group", "Level", "Val")).alias("group_level_val")).withColumn("group_scores",create_group_scores("group_level_val"))
item_groups = item_groups.select("Item", F.explode_outer("group_scores").alias("Group_Level", "Mean"))
item_groups.show()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | greenie |
