'Can't build docs in PySpark project without Spark session running

I have a Python package with a module containing UDF's to be used in a PySpark setting. I've worked out a way to initialize and shut down a Spark session when running the unit tests, but I'm having an issue creating the docs. I'm using Sphinx docs so I'm simply running make clean docs and running into this error:

  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf
    return _create_udf(f, returnType, evalType)
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 43, in _create_udf
    return udf_obj._wrapped()
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 206, in _wrapped
    wrapper.returnType = self.returnType
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 96, in returnType
    self._returnType_placeholder = _parse_datatype_string(self._returnType)
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 843, in _parse_datatype_string
    raise e
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 833, in _parse_datatype_string
    return from_ddl_schema(s)
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 825, in from_ddl_schema
    sc._jvm.org.apache.spark.sql.types.StructType.fromDDL(type_str).json())
AttributeError: 'NoneType' object has no attribute '_jvm'

The error is of course being caused by trying to load a module containing a UDF defined as such:

import pyspark.sql.functions as F

@F.pandas_udf(returnType='string')
def some_udf(col):
    return col

As I understand, the issue is the pandas_udf can't be created because there is no Spark session available when I run make clean docs. This library is imported in used in our Databricks cluster so the Spark session is already created and exists for you when the library is loaded. I've made some other UDF's which accept both static values and column arguments as such:

import pyspark.sql.functions as F

def outer_function(column_arg, integer_arg):

    @F.pandas_udf(returnType='string')
    def inner_function(column_arg):
        return do_something_with(column_arg, integer_arg)

    return inner_function(column_arg)

Should I refactor all my UDF's to work like this? Seems overkill.



Solution 1:[1]

You may be able to avoid the error if you change how the returnType is processed in the function decorator to some_udf. The error arises from this call to function decorator:

@F.pandas_udf(returnType='string')

Parsing the string 'string' into a pyspark data type causes the error. So, if you avoid parsing the string by providing the data type as StringType() directly, you may be able to avoid the error. This can be done by modifying the function decorator as shown below:

import pyspark.sql.functions as F
from pyspark.sql.types import StringType

# Avoid the need to parse a 'string' into StringType()
@F.pandas_udf(returnType=StringType())
def some_udf(col):
    return col

In my tests, executing the above snippet in python doesn't require a spark session at all.

If you have multiple instances of similar function calls of the form F.pandas_udf(returnType='string') in which a string (eg: 'string', 'integer') is parsed into a pyspark data type, then all those instances need to be replaced with the data type objects (eg: StringType(), IntegerType()).

There are other functions besides F.pandas_udf that parse a string into a pyspark data type and calls to all such functions would also contribute to the _jvm AttributeError that you are seeing. All such calls also need to be modified to avoid string parsing.

Sidebar: As an example pyspark.sql.SparkSession.createDataFrame function also accepts a string that gets parsed into pyspark data types, though in this case you explicitly need a SparkSession object before you can even call the createDataFrame function.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PySparkShell').getOrCreate()

# 'x INT, y STRING' is parsed to pyspark data types
df = spark.createDataFrame([(x, str(x)) for x in range(5)], 'x INT, y STRING')  

df.dtypes
# [('x', 'int'), ('y', 'string')]

Ideally, F.pandas_udf should also explicitly require a SparkSession object before it lets unsuspecting users provide data type information as a yet-to-be-parsed string (eg: 'string') but, sadly, it doesn't.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1