'Getting objects from S3 bucket using PySpark

I'm trying to get JSON objects from an S3 bucket using PySpark (on Windows, using wsl2 terminal).

I can do this using boto3 as an intermediate step but, when I try to use the spark.read.json method, I get an error.

Code:

import findspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os
import multiprocessing

#----------------APACHE CONFIGURATIONS--------------
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'

#---------------spark--------------
conf = (
    SparkConf()
    .set('spark.executor.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true')
    .set('spark.driver.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true')
    .setAppName('pyspark_aws')
    .setMaster(f"local[{multiprocessing.cpu_count()}]")
    .setIfMissing("spark.executor.memory", "2g")
        )
        

sc=SparkContext(conf=conf)
sc.setSystemProperty('com.amazonaws.services.s3.enableV4', 'true')
spark=SparkSession(sc)
#--------------hadoop--------------
accessKeyId='xxxxxxxxxxxx'
secretAccessKey='xxxxxxxxx'

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set('fs.s3a.access.key', accessKeyId)
hadoopConf.set('fs.s3a.secret.key', secretAccessKey)
hadoopConf.set('fs.s3a.endpoint', 's3-eu-west-1.amazonaws.com')
hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
hadoopConf.set('fs.s3a.multipart.size', '419430400')
hadoopConf.set('fs.s3a.multipart.threshold', '2097152000')
hadoopConf.set('fs.s3a.connection.maximum', '500')
hadoopConf.set('s3a.connection.timeout', '600000')


s3_df = spark.read.json('s3a://{bucket}/{directory}/{object}.json')

Error:

py4j.protocol.Py4JJavaError: An error occurred while call
: java.lang.NumberFormatException: For input string: "32M
        at java.base/java.lang.NumberFormatException.forI
        at java.base/java.lang.Long.parseLong(Long.java:6
        at java.base/java.lang.Long.parseLong(Long.java:8
        at org.apache.hadoop.conf.Configuration.getLong(C
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getDefa
        at org.apache.hadoop.fs.FileSystem.getDefaultBloc
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFile
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFile
        at org.apache.hadoop.fs.FileSystem.exists(FileSys
        at org.apache.spark.sql.execution.datasources.Dat
        at org.apache.spark.sql.execution.datasources.Dat
        at org.apache.spark.util.ThreadUtils$.$anonfun$pa
        at java.base/java.util.concurrent.ForkJoinTask$Ruava.util.coteAction.exec(ForkJoinTask.java:1426)ncurrent.Fojava.base/java.util.concurrent.ForkJoinTask.dorkJoinWorkejava.base/java.util.concurrent.ForkJoinPool$WorThread.runjava.base/java.util.concurrent.ForkJoinPool.sc(ForkJoinWojava.base/java.util.concurrent.ForkJoinPool.rurkerThread.java.base/java.util.concurrent.ForkJoinWorkerTjava:183)

I added the multipart.size, multipart.threshold, connection.maximum, connection.timeout hadoop conf settings when I was getting a similar error earlier (this earlier error had '64M' instead of '32M' and changed when I added these conf settings)

I'm new to Spark so any and all tips/pointers would be helpful!



Solution 1:[1]

if needed

the "32M" is the default of "fs.s3a.block.size"

try hadoopConf.set('fs.s3a.block.size', '33554432')

go to https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html

you will find the explanations of the "32M" and the "64M"

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 sam