'Is there a way to configure the memory resources for Spark using Pyspark

I'm working on an ETL job with an SageMaker notebook that uses spark 2.4.0. After joining a couple of tables I keep getting the following errors:

Update-- I was able to set up tis configuration:

 pyspark.SparkConf().setAll([('spark.executor.memory', '12g'),   
                            ('spark.executor.cores', '10'), 
                            ('spark.cores.max', '8'),
                            ('spark.driver.memory','8g'),
                            ("spark.yarn.executor.memoryOverhead", '11'),
                            ("spark.num.Executors", '15'),
                            ("spark.driver.cores", '10'),
                            ("spark.executor.instances", '94'),
                            ("spark.default.parallelism", '17860'),
                            ('spark.app.name', 'Spark Updated Conf'),          
                            ("spark.cleaner.periodicGC.interval", "10min")])

I have a very large instance for this to work; however, even with this I'm still getting memory errors. Now similar stack trace as the ones listed below but for: java.lang.OutOfMemoryError: GC overhead limit exceeded

Py4JJavaError: An error occurred while calling o8027.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 38 in stage 68.0 failed 1 times, most recent failure: Lost task 38.0 in stage 68.0 (TID 2900, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
 
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
        at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
Py4JJavaError: An error occurred while calling o5928.showString.
: org.apache.spark.SparkException: Job 30 cancelled because SparkContext was shut down
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:932)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:930)
       at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
       at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:930)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2126)
       at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
       at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2039)
       at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
       at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
       at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
       at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:575)
       at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
       at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
       at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
       at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
       at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
       at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
       at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
       at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
       at scala.util.Try$.apply(Try.scala:192)
       at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
       at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:748)
       at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
       at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
       at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
       at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
       at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
       at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
       at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
       at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
       at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
       at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
       at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
       at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
       at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
       at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:498)
       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
       at py4j.Gateway.invoke(Gateway.java:282)
       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
       at py4j.commands.CallCommand.execute(CallCommand.java:79)
       at py4j.GatewayConnection.run(GatewayConnection.java:238)
       at java.lang.Thread.run(Thread.java:748)

I've used this Memory configuration for Spark that is demonstrated within EMRKS in AWS , I was able to set it up on my SageMaker Notebook, but it doesn't seem to pick it up.

is there a way to configure the Memory resources that Spark is using while performing the computations in the back end?



Solution 1:[1]

It turns out the Spark version 2.4.0 is not supported on Amazon SageMaker notebooks as of today. There is an open ticket regarding this issue on Github

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Wendy Velasquez