'Group by and aggregate on a column with array in PySpark
I have the below PySpark dataframe. column_2 is of complex data type array<map<string,bigint>>
Column_1 Column_2 Column_3
A [{Mat=7},{Phy=8}] ABC
A [{Mat=7},{Phy=8}] CDE
B [{Mat=6},{Phy=7}] ZZZ
I have to group by on column 1 and column 2 and get the minimum aggregate of column 3.
The problem is when I try to group by column 1 and column 2 it's giving me an error
cannot be used as grouping expression because the data type is not an orderable data type
Is there a way to include this column in group by or to aggregate it in some way. The values in column_2 will always be same for a key value in column_1
Expected output:
Column_1 Column_2 Column_3
A [{Mat=7},{Phy=8}] ABC
B [{Mat=6},{Phy=7}] ZZZ
Is it possible to do a collect list of all value in aggregate function and flatten it and remove duplicates?
Solution 1:[1]
The values in column_2 will always be same for a key value in column_1
If so, then you can just take the first
value in the group.
Test dataframe:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 7, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)
print(df.dtypes)
# +--------+------------------------+--------+
# |Column_1|Column_2 |Column_3|
# +--------+------------------------+--------+
# |A |[{Mat -> 7}, {Phy -> 8}]|ABC |
# |A |[{Mat -> 7}, {Phy -> 8}]|CDE |
# |B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
# +--------+------------------------+--------+
# [('Column_1', 'string'), ('Column_2', 'array<map<string,bigint>>'), ('Column_3', 'string')]
Aggregation:
df2 = df.groupBy('Column_1').agg(
F.first('Column_2').alias('Column_2'),
F.min('Column_3').alias('Column_3')
)
df2.show(truncate=False)
# +--------+------------------------+--------+
# |Column_1|Column_2 |Column_3|
# +--------+------------------------+--------+
# |A |[{Mat -> 7}, {Phy -> 8}]|ABC |
# |B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
# +--------+------------------------+--------+
Solution 2:[2]
I possibly misunderstood your question. If I did, no harm someone can benefit.
I thought you wanted to select the minimum aggregated sum of values in Column_2
. I therefore modified the dataframe slightly to ensure group A
has multiple values. see df
df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 3, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)
df
+--------+------------------------+--------+
|Column_1|Column_2 |Column_3|
+--------+------------------------+--------+
|A |[{Mat -> 7}, {Phy -> 8}]|ABC |
|A |[{Mat -> 3}, {Phy -> 8}]|CDE |
|B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
+--------+------------------------+--------+
Solution
If my assumption is right
- Extract values of the key value pair in
Column_2
into a column calledfilter
- Aggregate them by summing them. Save out come in
filter
- Order by
Column_1
andfilter
- drop duplicates with subset
Column_1
Code below
new = df.withColumn("filter",F.expr("aggregate(transform(Column_2,x -> map_values(x)[0] ),cast(0 as bigint),(x,i)->x+i)")).orderBy('Column_1',desc('filter')).dropDuplicates(['Column_1']).drop('filter')
new.show()
+--------+------------------------+--------+
|Column_1|Column_2 |Column_3|
+--------+------------------------+--------+
|A |[{Mat -> 7}, {Phy -> 8}]|ABC |
|B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
+--------+------------------------+--------+
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | ZygD |
Solution 2 | wwnde |