'Calculate MAPE and apply to PySpark grouped Dataframe [@pandas_udf]

Goal: Calculate mean_absolute_percentage_error (MAPE) for each unique ID.

  • y - real value
  • yhat - predicted value

Sample PySpark Dataframe: join_df

+----------+----------+-------+---------+----------+----------+
|        ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
|    Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|    Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|    Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|    Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|    Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|    Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+----------+----------+-------+---------+----------+----------+

final_schema =StructType([
  StructField('ds',DateType()),
  StructField('ID',IntegerType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('mape',FloatType())
  ])

I have tried by creating an uff and applyed it on IDs using apply function.

from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
  
  mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
  join_df['mape'] = mape
  
  return join_df

df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()

However, I am getting the error:

PythonException: 'TypeError: Return type of the user-defined function should be pandas.DataFrame, but is <class 'numpy.float32'>'

I understand that I am requesting for MAPE as numpy output and it should be dataframe. But I am sure if I know what exactly needs to be done differently in order to get MAPE for each ID.



Solution 1:[1]

You need return a DataFrame with PandasUDFType.GROUPED_MAP , since you are returning a numpy array , hence you see the exception.

You need to modify the schema as well towards your final returned dataframe from the group by function

Also you should use - applyInPandas , I have added its usage as well

Data Preparation

s= StringIO("""
ID,ds,y,yhat,yhat_upper,yhat_lower
Ax849b,2021-07-01,1165.59, 1298.809, 1939.1261, 687.48206
Ax849b,2021-07-02,1120.69, 1295.552, 1892.4929,   693.786
Ax849b,2021-07-03,1120.69, 1294.079, 1923.0253,  664.1514
Ax849b,2021-07-04,1120.69,1295.0399, 1947.6392,  639.4879
Bz383J,2021-07-03,1108.71,1159.4934, 1917.6515, 652.76624
Bz383J,2021-07-04,1062.77,1191.2385, 1891.9268,  665.9529
""")

df = pd.read_csv(s,delimiter=',')

sparkDF = sql.createDataFrame(df)

sparkDF.show()

+------+----------+-------+---------+----------+----------+
|    ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+------+----------+-------+---------+----------+----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+------+----------+-------+---------+----------+----------+

Pandas UDF - Usage

final_schema =StructType([
  StructField('ID',StringType()),
  StructField('ds',StringType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('mape',FloatType())
  ])

@F.pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
    
    mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
    
    join_df['mape'] = mape
    
    return join_df


sparkDF.groupby('ID').apply(gr_mape_val).show()

+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+

applyInPandas

final_schema =StructType([
  StructField('ID',StringType()),
  StructField('ds',StringType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('mape',FloatType())
  ])


def gr_mape_val(join_df):
    
    mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
    
    join_df['mape'] = mape
    
    return join_df


sparkDF.groupby('ID').applyInPandas(gr_mape_val,final_schema).show()

+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1