'After Split-apply-combing in PySpark the code does not show the final result (from the book "Data Analysis with Python and PySpark" by JONATHAN RIOUX)
After Split-apply-combing in PySpark the code does not show the final result (from the book "Data Analysis with Python and PySpark" by JONATHAN RIOUX), the code seems to be working, but fails to "show" results. NOTE This is not a machine learning exercise: I am just using scikit-learn’s plumbing to create a feature. We are Creating a grouped aggregate UDF. Then Creating a grouped aggregate UDF. Then a group map UDF to scale temperature values. Split-apply-combing in PySpark. Finally, Moving one station, one month’s worth of data into a local pandas DataFrame.
> The code seems to be working, but fails to "show" results:
import pyspark
import os
import sys
import pandas as pd
from pyspark.sql import SparkSession
# from pyspark.sql.functions import col, explode
from functools import reduce
import pyspark.sql.types as T
import pyspark.sql.functions as F
from sklearn.linear_model import LinearRegression
spark = pyspark.sql.SparkSession.builder.appName("MyApp").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
os.environ['PYSPARK_PYTHON'] = 'C:/bigdatasetup/anaconda3/envs/pyspark-env/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/bigdatasetup/anaconda3/envs/pyspark-env/python.exe'
#os.environ['PYTHONPATH'] = ":".join(sys.path)
gsod = (
reduce(
lambda x, y: x.unionByName(y, allowMissingColumns=True),
[
spark.read.parquet(
f"C:/bigdatasetup/spark/data/gsod_noaa/gsod{year}
.parquet")
for year in range(2010, 2020)
],
)
.dropna(subset=["year", "mo", "da", "temp"])
.where(F.col("temp") != 9999.9)
.drop("date")
)
> Listing 9.8 Creating a grouped aggregate UDF
@F.pandas_udf(T.DoubleType())
def rate_of_change_temperature(day: pd.Series, temp: pd.Series) - > float:
return (
LinearRegression()
.fit(X=day.astype(int).values.reshape(-1, 1), y=temp)
.coef_[0]
)
> #Listing 9.10 A group map UDF to scale temperature values
def scale_temperature(temp_by_day: pd.DataFrame) -> pd.DataFrame:
temp = temp_by_day.temp
answer = temp_by_day[["stn", "year", "mo", "da", "temp"]]
if temp.min() == temp.max():
return answer.assign(temp_norm = 0.5)
return answer.assign(
temp_norm = (temp-temp.min()) / (temp.max()-temp.min())
)
> # Listing 9.11 Split-apply-combing in PySpark
gsod_map = gsod.groupby("stn", "year", "mo").applyInPandas(
scale_temperature,
schema=(
"stn string, year string, mo string, "
"da string, temp double, temp_norm double"),
)
gsod_map.show(5, False)
> # Here the showing does not work
> # Listing 9.12 Moving one station, one month’s worth of data into a local pandas DataFrame
gsod_local = gsod.where(
"year = '2018' and mo = '08' and stn = '710920'"
).toPandas()
print(
rate_of_change_temperature.func(
gsod_local["da"], gsod_local["temp_norm"]
)
)
**> # It gives this error:
> # File "C:\bigdatasetup\anaconda3\envs\pyspark-env\lib\site-packages
> # \pandas\core\indexes\base.py", line 3623, in get_loc
> # raise KeyError(key) from err
> # KeyError: 'temp_norm'**
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
