'Glue job failed with "No space left on device" or "ArrayIndexOutOfBoundsException" when writing a huge data frame
I have a glue job that:
- create dynamic frames from several data catalogs
- change to spark dataframes.
- join 4 dataframes and complete aggregation.
- write to s3 with csv/parquet file type.
It had no problem with medium-sized data source(about 20G data in total), G1x DPU, 20 workers with execution time 40min. But when data volume increased to 60G in total, with G2x DPU and 50 workers, time consuming increased to 4-6 hours, and result in error:
21/08/07 16:41:27 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
File "/tmp/test-deal-stepfun.py", line 213, in <module>
df.coalesce(1).write.partitionBy("log_date_syd").mode("overwrite").csv(args['DEST_FOLDER'])
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 927, in csv
self._jwrite.csv(path)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o253.csv.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.lang.ArrayIndexOutOfBoundsException: 0
java.lang.ArrayIndexOutOfBoundsException: 0
at org.apache.spark.scheduler.CompressedMapStatus.getSizeForBlock(MapStatus.scala:119)
at org.apache.spark.MapOutputTrackerMaster$$anonfun$getLocationsWithLargestOutputs$1.apply(MapOutputTracker.scala:612)
at org.apache.spark.MapOutputTrackerMaster$$anonfun$getLocationsWithLargestOutputs$1.apply(MapOutputTracker.scala:599)
at org.apache.spark.ShuffleStatus.withMapStatuses(MapOutputTracker.scala:192)
at org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:599)
at org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:568)
at org.apache.spark.sql.execution.ShuffledRowRDD.getPreferredLocations(ShuffledRowRDD.scala:152)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:275)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:275)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
...
BTW, I have job parameters for optimization memory and disk:
- "--conf: spark.executor.memory=20g --conf: spark.driver.memory=20g --conf: spark.driver.memoryOverhead=10g --conf: spark.executor.memoryOverhead=10g" to add more memory to spark driver and executors.
- "--write-shuffle-files-to-s3: true" redirect intermediate files to s3 to give more space for worker nodes.
- In job script, set s3 retry
conf = SparkConf()
conf.set("spark.hadoop.fs.s3.maxRetries","20").set("spark.hadoop.fs.s3.sleepTimeSeconds","30")
- In job script, add options to create dynamic frame
"useS3ListImplementation": True,
"groupFiles": "InPartition",
"groupSize": "10485760"
- Optimize spark job code. Drop unused columns before join, and distinct for left join.
The errors are related to "no space left on device", or "ArrayIndexOutOfBoundsException" when writing. The metrics pattern: metrics How to avoid failure for huge data writing in glue job, thanks a lot!
Solution 1:[1]
I recently encountered this same issue while running an AWS glue job configured to use S3 for shuffle. In my case the issue was that I had incorrectly set the configuration for spark.shuffle.glue.s3ShuffleBucket. Once I fixed my job parameters to have --conf spark.shuffle.glue.s3ShuffleBucket=S://mybucket/mypath, with the key being --conf and the value being spark.shuffle.glue.s3ShuffleBucket=S://mybucket/mypath, it worked.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Josh Axtell |
