'Error when joining dataframes using AWS Glue Container
I tried joining two sample dataframes using the code below :
from pyspark import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://bayu-wbi-test/customers.csv"]}, format = "csv")
DF1 = inputDF.toDF()
DF2 = inputDF.toDF()
DoubleDF = DF1.join(DF2,DF1.col0 == DF2.col0)
DoubleDF.show()
however i encounter this error when i run it in my Glue container :
An error was encountered:
An error occurred while calling o135.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 4 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:266) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Stream is corrupted at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:772) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:129) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110) at scala.collection.Iterator$$anon$11.next(Iterator.scala:494) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:351) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at
This container is running on my local machine and i've tried increasing the spark driver memory. Thanks for the help.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
