'After Split-apply-combing in PySpark the code does not show the final result (from the book "Data Analysis with Python and PySpark" by JONATHAN RIOUX)

After Split-apply-combing in PySpark the code does not show the final result (from the book "Data Analysis with Python and PySpark" by JONATHAN RIOUX), the code seems to be working, but fails to "show" results. NOTE This is not a machine learning exercise: I am just using scikit-learn’s plumbing to create a feature. We are Creating a grouped aggregate UDF. Then Creating a grouped aggregate UDF. Then a group map UDF to scale temperature values. Split-apply-combing in PySpark. Finally, Moving one station, one month’s worth of data into a local pandas DataFrame.

> The code seems to be working, but fails to "show" results:

    import pyspark
    import os
    import sys
    import pandas as pd
    from pyspark.sql import SparkSession
    # from pyspark.sql.functions import col, explode
    from functools import reduce
    import pyspark.sql.types as T
    import pyspark.sql.functions as F
    from sklearn.linear_model import LinearRegression

    spark = pyspark.sql.SparkSession.builder.appName("MyApp").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    os.environ['PYSPARK_PYTHON'] = 'C:/bigdatasetup/anaconda3/envs/pyspark-env/python.exe'
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/bigdatasetup/anaconda3/envs/pyspark-env/python.exe'
    #os.environ['PYTHONPATH'] = ":".join(sys.path)
    gsod = (
        reduce(
            lambda x, y: x.unionByName(y, allowMissingColumns=True),
            [                
                spark.read.parquet(
                    f"C:/bigdatasetup/spark/data/gsod_noaa/gsod{year}
                    .parquet")
                for year in range(2010, 2020)
            ],
        )    
        .dropna(subset=["year", "mo", "da", "temp"])    
        .where(F.col("temp") != 9999.9)
        .drop("date")
    )


> Listing 9.8 Creating a grouped aggregate UDF

    @F.pandas_udf(T.DoubleType())
    def rate_of_change_temperature(day: pd.Series, temp: pd.Series) -   > float:
        return (
            LinearRegression()
            .fit(X=day.astype(int).values.reshape(-1, 1), y=temp)
            .coef_[0]
        )


> #Listing 9.10 A group map UDF to scale temperature values

    def scale_temperature(temp_by_day: pd.DataFrame) -> pd.DataFrame:
        temp = temp_by_day.temp
        answer = temp_by_day[["stn", "year", "mo", "da", "temp"]]
        if temp.min() == temp.max():
            return answer.assign(temp_norm = 0.5)
        return answer.assign(
            temp_norm = (temp-temp.min()) / (temp.max()-temp.min())
        )


> # Listing 9.11 Split-apply-combing in PySpark

    gsod_map = gsod.groupby("stn", "year", "mo").applyInPandas(
        scale_temperature, 
        schema=(
            "stn string, year string, mo string, "
            "da string, temp double, temp_norm double"),
    )

    gsod_map.show(5, False)
> # Here the showing does not work


> # Listing 9.12 Moving one station, one month’s worth of data into a local pandas DataFrame

    gsod_local = gsod.where(
        "year = '2018' and mo = '08' and stn = '710920'"
    ).toPandas()

    print(
        rate_of_change_temperature.func(
            gsod_local["da"], gsod_local["temp_norm"]
            )   
    )
**> # It gives this error:
> # File "C:\bigdatasetup\anaconda3\envs\pyspark-env\lib\site-packages
> # \pandas\core\indexes\base.py", line 3623, in get_loc
> # raise KeyError(key) from err
> # KeyError: 'temp_norm'**


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source