'PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable

I am getting this error after running the function below.

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

The objective of this piece of code is to create a flag for every row based on the date differences. Multiple rows per user are supplied to the function to create the values of the flag.

Does anyone know what this means and how I can fix it?

def selection(df):
  _i = 0
  l = []
  x = df.select("Date").orderBy("Date").rdd.flatMap(lambda x: x).collect()
  rdd = spark.sparkContext.parallelize(x)
  l.append((x[_i],1))

  for _j in range(_i+1,rdd.count()):

    if((d2.year - d1.year) * 12 + (d2.month  - d1.month) >= 2 ):
      l.append((x[_j],1))
    else:
      l.append((x[_j],0))
      continue
    _i=_j

  columns = ['Date','flag']
  new_df = spark.createDataFrame(l, columns)
  df_new = df.join(new_df,['Date'],"inner")

  return df_new

ToKeep = udf(lambda z: selection(z))
sample_new = sample.withColumn("toKeep",ToKeep(sample).over(Window.partitionBy(col("id")).orderBy(col("id"),col("Date"))))


Solution 1:[1]

@udf (stringType())
def function(x):
    #could not have spark operation(such as spark.sql)
    #if it contains spark operation ,must create new sparkContext
    pass

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Henry Ecker