'PySpark error: AttributeError: 'NoneType' object has no attribute '_jvm'

I have timestamp dataset which is in format of

And I have written a udf in pyspark to process this dataset and return as Map of key values. But am getting below error message.

Dataset:df_ts_list

+--------------------+
|             ts_list|
+--------------------+
|[1477411200, 1477...|
|[1477238400, 1477...|
|[1477022400, 1477...|
|[1477224000, 1477...|
|[1477256400, 1477...|
|[1477346400, 1476...|
|[1476986400, 1477...|
|[1477321200, 1477...|
|[1477306800, 1477...|
|[1477062000, 1477...|
|[1477249200, 1477...|
|[1477040400, 1477...|
|[1477090800, 1477...|
+--------------------+

Pyspark UDF:

>>> def on_time(ts_list):
...     import sys
...     import os
...     sys.path.append('/usr/lib/python2.7/dist-packages')
...     os.system("sudo apt-get install python-numpy -y")
...     import numpy as np
...     import datetime
...     import time
...     from datetime import timedelta
...     ts = np.array(ts_list)
...     if ts.size == 0:
...             count = 0
...             duration = 0
...             st = time.mktime(datetime.now())
...             ymd = str(datetime.fromtimestamp(st).date())
...     else:
...             ts.sort()
...             one_tag = []
...             start = float(ts[0])
...             for i in range(len(ts)):
...                     if i == (len(ts)) - 1:
...                             end = float(ts[i])
...                             a_round = [start, end]
...                             one_tag.append(a_round)
...                     else:
...                             diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))
...                             if abs(diff.total_seconds()) > 3600:
...                                     end = float(ts[i])
...                                     a_round = [start, end]
...                                     one_tag.append(a_round)
...                                     start = float(ts[i+1])
...             one_tag = [u for u in one_tag if u[1] - u[0] > 300]
...             count = int(len(one_tag))
...             duration = int(np.diff(one_tag).sum())
...             ymd = str(datetime.datetime.fromtimestamp(time.time()).date())
...     return {'count':count,'duration':duration, 'ymd':ymd}

Pyspark code:

>>> on_time=udf(on_time, MapType(StringType(),StringType()))
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()

Error:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<stdin>", line 27, in on_time
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
AttributeError: 'NoneType' object has no attribute '_jvm'

Any help would be appreciated!



Solution 1:[1]

Mariusz answer didn't really help me. So if you like me found this because it's the only result on google and you're new to pyspark (and spark in general), here's what worked for me.

In my case I was getting that error because I was trying to execute pyspark code before the pyspark environment had been set up.

Making sure that pyspark was available and set up before doing calls dependent on pyspark.sql.functions fixed the issue for me.

Solution 2:[2]

The error message says that in 27th line of udf you are calling some pyspark sql functions. It is line with abs() so I suppose that somewhere above you call from pyspark.sql.functions import * and it overrides python's abs() function.

Solution 3:[3]

Just to be clear the problem a lot of guys are having is stemming from a single bad programming style. That is from blah import *

When you guys do

from pyspark.sql.functions import *

you overwrite a lot of python builtins functions. I strongly recommending importing functions like

import pyspark.sql.functions as f
# or 
import pyspark.sql.functions as pyf

Solution 4:[4]

Make sure that you are initializing the Spark context. For example:

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("...") \
    .getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("com.mongodb.spark.sql").load()

Or as in

spark = SparkSession.builder.appName('company').getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("csv").option("delimiter", ",") \
    .option("quote", "\"").option("escape", "\"") \
    .option("header", "true").option("inferSchema", "true") \
    .load("/path/thecsv.csv")

Solution 5:[5]

This exception also arises when the udf can not handle None values. For example the following code results in the same exception:

get_datetime = udf(lambda ts: to_timestamp(ts), DateType())
df = df.withColumn("datetime", get_datetime("ts"))

However this one does not:

get_datetime = udf(lambda ts: to_timestamp(ts) if ts is not None else None, DateType())
df = df.withColumn("datetime", get_datetime("ts"))

Solution 6:[6]

I found this error in my jupyter notebook. I added the below commands import findspark findspark.init() sc = pyspark.SparkContext(appName="")

and it worked. its the same problem of spark context not ready or Stopped.

Solution 7:[7]

I faced the same issue, when I had python's round() function in my code and like @Mariusz said python's round() function got overridden.

The workaround for this was to use __builtin__.round() instead of round() like @Mariusz mentions in the comments in his answer.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Mariusz
Solution 3 SARose
Solution 4 Gustavo Frederico
Solution 5 dsalaj
Solution 6 Mayuresh Dhawan
Solution 7 Luke