'Calculate percentile with groupBy on PySpark dataframe

I am trying to groupBy and then calculate percentile on PySpark dataframe. I've tested the following piece of code according to this Stack Overflow post:

from pyspark.sql.types import FloatType
import pyspark.sql.functions as func
import numpy as np

qt_udf = func.udf(lambda x,qt: float(np.percentile(x,qt)), FloatType())
df_out = df_in.groupBy('Id').agg(func.collect_list('value').alias('data'))\
.withColumn('median', qt_udf(func.col('data'),func.lit(0.5)).cast("string"))  

df_out.show()

But get the following error:

Traceback (most recent call last): > df_out.show() ....> return lambda *a: f(*a) AttributeError: 'module' object has no attribute 'percentile'

This is because of numpy version (1.4.1), the percentile function was added from version 1.5. It is not possible to update numpy version in the short term.



Solution 1:[1]

Define a window and use the inbuilt percent_rank function to compute percentile values.

from pyspark.sql import Window
from pyspark.sql import functions as func
w = Window.partitionBy(df_in.Id).orderBy(df_in.value) #assuming default ascending order
df_out = df_in.withColumn('percentile_col',func.percent_rank().over(w))

Solution 2:[2]

Question's title suggests that OP wanted to calculate percentiles. But the body shows that he needed to calculate median in groups.

Test dataset:

from pyspark.sql import SparkSession, functions as F, Window as W, Window
spark = SparkSession.builder.getOrCreate()
df_in = spark.createDataFrame(
    [('1', 10),
     ('1', 11),
     ('1', 12),
     ('1', 13),
     ('2', 20)],
    ['Id', 'value']
)

Percentiles of given data points in groups:

w = W.partitionBy('Id').orderBy('value')
df_in = df_in.withColumn('percentile_of_value_by_Id', F.percent_rank().over(w))

df_in.show()
#+---+-----+-------------------------+
#| Id|value|percentile_of_value_by_Id|
#+---+-----+-------------------------+
#|  1|   10|                      0.0|
#|  1|   11|       0.3333333333333333|
#|  1|   12|       0.6666666666666666|
#|  1|   13|                      1.0|
#|  2|   20|                      0.0|
#+---+-----+-------------------------+

Median (accurate and approximate):

df_out = (df_in.groupBy('Id').agg(
    F.expr('percentile(value, .5)').alias('median_accurate'),  # for small-mid dfs
    F.percentile_approx('value', .5).alias('median_approximate')  # for mid-large dfs
))

df_out.show()
#+---+---------------+------------------+
#| Id|median_accurate|median_approximate|
#+---+---------------+------------------+
#|  1|           11.5|                11|
#|  2|           20.0|                20|
#+---+---------------+------------------+

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 ZygD