'PySpark: Error occurred while calling o43.collectAsArrowToPython

I'm trying to resolve a bug with a local Spark environment in AWS Sagemaker that I believe comes down to version issues. Running the pyarrow.py example from the spark repo throws the error at the bottom of the post.

I've tested creating spark dataframes in this way and performing transformations, and those work fine. The error only pops up when converting back to pandas.

My environment:

java-1.8.0-openjdk
pandas==1.1.4
pyspark==2.3.4
pyarrow==0.17.1

My code:

def get_spark_config():
    os.environ['JAVA_HOME'] = 'path_to_openjdk'
    return SparkConf()\
        .setAppName("appName")\
        .setMaster('local[*]')\
        .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.2")\
        .set("spark.sql.execution.arrow.enabled", "True")\

def get_spark_context(conf):
    return SparkContext(conf=conf)

def get_sql_context(sc):
    return SQLContext(sc)

def simple_spark():
    conf = get_spark_config()
    sc = get_spark_context(conf)
    sql_ctx = get_sql_context(sc)
    
    ### Begin code from Spark repo ###
    # Generate a Pandas DataFrame
    pdf = pd.DataFrame(np.random.rand(100, 3))

    # Create a Spark DataFrame from a Pandas DataFrame using Arrow
    df = sql_ctx.createDataFrame(pdf)

    # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
    result_pdf = df.select("*").toPandas()
    
    return result_pdf

if __name__ == "__main__":
    simple_spark()

The error:

RuntimeError                              Traceback (most recent call last)
<ipython-input-2-e83fbd1bb421> in <module>
----> 1 simple_spark()
    645 
    646     # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
--> 647     result_pdf = df.select("*").toPandas()
    648 
    649     return result_pdf

~/anaconda3/envs/JupyterSystemEnv/lib/python3.6/site-packages/pyspark/sql/dataframe.py in toPandas(self)
   1964                     "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
   1965                     "to disable this.")
-> 1966                 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
   1967         else:
   1968             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

RuntimeError: An error occurred while calling o43.collectAsArrowToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): java.lang.NullPointerException
        at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:256)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:242)
        at org.apache.arrow.vector.ipc.ArrowFileReader.readRecordBatch(ArrowFileReader.java:162)
        at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:113)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.nextBatch(ArrowConverters.scala:170)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:138)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$.fromPayloadIterator(ArrowConverters.scala:135)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:211)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:209)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1.apply(Dataset.scala:3217)
        at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1.apply(Dataset.scala:3215)
        at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3265)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3264)
        at org.apache.spark.sql.Dataset.collectAsArrowToPython(Dataset.scala:3215)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:256)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:242)
        at org.apache.arrow.vector.ipc.ArrowFileReader.readRecordBatch(ArrowFileReader.java:162)
        at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:113)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.nextBatch(ArrowConverters.scala:170)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:138)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$.fromPayloadIterator(ArrowConverters.scala:135)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:211)
        at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:209)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

Note: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true. Please set it to false to disable this.


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source