'PySpark: Error occurred while calling o43.collectAsArrowToPython
I'm trying to resolve a bug with a local Spark environment in AWS Sagemaker that I believe comes down to version issues. Running the pyarrow.py example from the spark repo throws the error at the bottom of the post.
I've tested creating spark dataframes in this way and performing transformations, and those work fine. The error only pops up when converting back to pandas.
My environment:
java-1.8.0-openjdk
pandas==1.1.4
pyspark==2.3.4
pyarrow==0.17.1
My code:
def get_spark_config():
os.environ['JAVA_HOME'] = 'path_to_openjdk'
return SparkConf()\
.setAppName("appName")\
.setMaster('local[*]')\
.set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.2")\
.set("spark.sql.execution.arrow.enabled", "True")\
def get_spark_context(conf):
return SparkContext(conf=conf)
def get_sql_context(sc):
return SQLContext(sc)
def simple_spark():
conf = get_spark_config()
sc = get_spark_context(conf)
sql_ctx = get_sql_context(sc)
### Begin code from Spark repo ###
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = sql_ctx.createDataFrame(pdf)
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
return result_pdf
if __name__ == "__main__":
simple_spark()
The error:
RuntimeError Traceback (most recent call last)
<ipython-input-2-e83fbd1bb421> in <module>
----> 1 simple_spark()
645
646 # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
--> 647 result_pdf = df.select("*").toPandas()
648
649 return result_pdf
~/anaconda3/envs/JupyterSystemEnv/lib/python3.6/site-packages/pyspark/sql/dataframe.py in toPandas(self)
1964 "'spark.sql.execution.arrow.enabled' is set to true. Please set it to false "
1965 "to disable this.")
-> 1966 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
1967 else:
1968 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
RuntimeError: An error occurred while calling o43.collectAsArrowToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): java.lang.NullPointerException
at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:256)
at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:242)
at org.apache.arrow.vector.ipc.ArrowFileReader.readRecordBatch(ArrowFileReader.java:162)
at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:113)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.nextBatch(ArrowConverters.scala:170)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:138)
at org.apache.spark.sql.execution.arrow.ArrowConverters$.fromPayloadIterator(ArrowConverters.scala:135)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:211)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:209)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1.apply(Dataset.scala:3217)
at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1.apply(Dataset.scala:3215)
at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3265)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3264)
at org.apache.spark.sql.Dataset.collectAsArrowToPython(Dataset.scala:3215)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:256)
at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:242)
at org.apache.arrow.vector.ipc.ArrowFileReader.readRecordBatch(ArrowFileReader.java:162)
at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:113)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.nextBatch(ArrowConverters.scala:170)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:138)
at org.apache.spark.sql.execution.arrow.ArrowConverters$.fromPayloadIterator(ArrowConverters.scala:135)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:211)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$3.apply(ArrowConverters.scala:209)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Note: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true. Please set it to false to disable this.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
