'Pyspark - find the oldest date in a nested collection
I have the following dataframe
root
|-- AUTHOR_ID: integer (nullable = false)
|-- Books: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- DATE: TimestampType (nullable = true)
How to find the oldest published book for each author ? I want to retrieve the date
{
"AUTHOR_ID": 1,
"FIRST_PUBLICATION": <Date>
"Books": "[ ... ]"
}
Solution 1:[1]
Many ways of doing, Lets Try window functions
root
|-- AUTHOR_ID: integer (nullable = false)
|-- Books: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- DATE: date (nullable = true)
+---------+--------------------------------+
|AUTHOR_ID|Books |
+---------+--------------------------------+
|21 |[{Stories of Mary, 2019-12-01}] |
|34 |[{Sorrows of Mary, 2019-09-01}] |
|34 |[{Sparrows of Mary, 2019-06-16}]|
|21 |[{Songs of Mary, 2017-03-14}] |
+---------+--------------------------------+
Following you Edits
win=Window.partitionBy('AUTHOR_ID').orderBy(F.asc('Books.Date'))
df1=(
df.withColumn("rank", row_number().over(win)==1).where(col('rank')==1).drop('rank')#Filter by oldest date
.withColumn('value', to_json(F.struct(col('AUTHOR_ID'),col('Books.Date').alias('FIRST_PUBLICATION'),'Books')))#Create json column
).select('value').show(truncate=False)
+-------------------------------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------------------------------+
|{"AUTHOR_ID":21,"FIRST_PUBLICATION":["2017-03-14"],"Books":[{"NAME":"Songs of Mary","DATE":"2017-03-14"}]} |
|{"AUTHOR_ID":34,"FIRST_PUBLICATION":["2019-06-16"],"Books":[{"NAME":"Sparrows of Mary","DATE":"2019-06-16"}]}|
+-------------------------------------------------------------------------------------------------------------+
Solution 2:[2]
For Spark v3 using Spark Higher-order functions is the best solution,
df = spark.createDataFrame([("1", [Row(NAME="xs", DATE=datetime.strptime('2022-04-06 00:00:00', '%Y-%m-%d %H:%M:%S')),
Row(NAME="s", DATE=datetime.strptime('2022-04-05 00:00:00', '%Y-%m-%d %H:%M:%S')),]), ],
'struct<AUTHOR_ID:string,Books:array<struct<NAME:string,DATE:timestamp>>>')
df.show(truncate=False)
+---------+-----------------------------------------------------+
|AUTHOR_ID|Books |
+---------+-----------------------------------------------------+
|1 |[{xs, 2022-04-06 00:00:00}, {s, 2022-04-05 00:00:00}]|
+---------+-----------------------------------------------------+
df.printSchema()
root
|-- AUTHOR_ID: string (nullable = true)
|-- Books: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- DATE: timestamp (nullable = true)
We can get the book with the least date for each author as the following
df = df.withColumn('FIRST_PUBLICATION',
f.aggregate(
'Books',
f.lit(datetime.strptime('2222-02-22 22:22:22', '%Y-%m-%d %H:%M:%S')),
lambda acc, b : f.least(acc, b['DATE'])
)
)
Result
# df.show()
+---------+--------------------+-------------------+
|AUTHOR_ID| Books| FIRST_PUBLICATION|
+---------+--------------------+-------------------+
| 1|[{xs, 2022-04-06 ...|2022-04-05 00:00:00|
+---------+--------------------+-------------------+
Solution 3:[3]
Since Spark 2.4, you can use the array_min function to retrieve the minimum element of an array. You apply this function to an array that contains only the dates. To build the array that contains only dates, you can use getField method on Books column.
Here is the complete code:
from pyspark.sql import functions as F
df = df.withColumn('FIRST_PUBLICATION', F.array_min(F.col('Books').getField('DATE')))
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Smaillns |
| Solution 3 | Vincent Doba |
