'Issue in executing pyspark udf for simple time series forecasting
I'm writing a pyspark (2.4.8) job for performing time series predictions on a lot of models. I've a pandas df containing the model_ids and their corresponding path where the pickle files are present. I'll need to generate predictions using all these models.
My approach is to first convert the pandas df to a spark df. Then I'm planning to execute an udf per model-artifact-path to get the predictions, which are a simple list of integers.
Below is my high-level code:
import pyspark.sql.functions as F
def inference_arima_with_udf(spark_session, model_metadata_pandas_df, start_date, end_date):
def get_preds(model_path, num_steps):
# load model from model_metadata
model = load_model(model_path)
predictions = model.predict(num_steps) # returns a numpy array of ints
return predictions
def get_schema():
schema = StructType(
[
StructField("model_id", IntegerType(), True),
StructField("model_artifacts_path", StringType(), True)
]
)
return schema
num_steps = end_date - start_date + 1
model_metadata_spark_df = spark_session.createDataFrame(model_metadata_pandas_df, get_schema())
get_preds_udf = F.udf(lambda mp: get_preds(mp, num_steps), ArrayType(IntegerType()))
model_metadata_spark_df = model_metadata_spark_df.withColumn('forecasts', get_preds_udf(F.col('model_artifacts_path'),
F.lit(num_steps)))
return model_metadata_spark_df
Problem is, once I execute this function, it returns this error:
TypeError: () takes 1 positional argument but 2 were given
I've spent quite some time on this, but no luck so far. Any help to point me in the right direction is much appreciated.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
