Easy way to accessing s3 buckets with spark

Easy way to accessing s3 buckets with spark
Photo by Lucas van Oort / Unsplash

It's sometimes difficult to access s3 files in Apache Spark if you don't use a prebuilt environment like Zepelin, Glue Notebooks, HUE, Databriks Notebooks or other alternatives. And googling around might get you half working solutions. But do not worry, I'll show you how easy it is to do it with spark 3.3.X

šŸ“œ
TL;DR at the end

1. Getting the right package

In order to access s3, a version of hadoop (yes, that think you hear about but is 10yr old and of almost no direct use anymore) greater than 3 is needed. Also, in our case we are going to get the AWS flavored one. You can visit MVN repository to get the latest version.

pyspark --packages org.apache.hadoop:hadoop-aws:3.3.4
# OR
spark --packages org.apache.hadoop:hadoop-aws:3.3.4
Requesting the pachage via CLI

Another option more suited to interactive environments like a jupyter notebook is to declare it in the app config, as shown in the latest blog post: Loading packages in pyspark jupyter notebook/lab.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("myapp") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4")\
    .getOrCreate()

2. Using the correct URI

Usually s3 object identifiers start with Ā s3:// but in this case an extra a its needed: s3a://... . If not, the following error will arise: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3

Error stack


Traceback (most recent call last):
File "stdin", line 1, in module
File "/home/XXX/spark-3.1.1-amzn/python/pyspark/sql/readwriter.py", line 458, in parquet
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
File "/home/XXX/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/XXX/spark-3.1.1-amzn/python/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/XXX/anaconda3/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o35.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3336)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3356)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3407)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3375)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:486)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:834)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
            at java.base/java.lang.Thread.run(Thread.java:829)

3. Configure spark to use the ~/.aws/credentials file

If you don't want to add your AWS key ID and secrets manually in plain text NOTE: YOU SHOULD NOT , let spark use the profiles you have configured via your AWS CLI. To do so, we need to add a last configuration (though IMO should be the default behavior).

pyspark --packages org.apache.hadoop:hadoop-aws:3.3.4 --conf "fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
# OR
spark --packages org.apache.hadoop:hadoop-aws:3.3.4 --conf "fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
Requesting the pachage via CLI
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("myapp") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4")\
    .config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")\
    .getOrCreate()

# OR, if spark is already initialised
spark.conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

If you don't do it, this nasty error will appear:

java.nio.file.AccessDeniedException: data-airflow-staging: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint

Error stack



        2/09/27 12:13:27 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
Traceback (most recent call last):
File "stdin", line 1, in module
File "/home/marti/spark-3.1.1-amzn/python/pyspark/sql/readwriter.py", line 458, in parquet
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
File "/home/XXX/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/XXX/spark-3.1.1-amzn/python/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/XXX/anaconda3/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o40.parquet.
: java.nio.file.AccessDeniedException: data-airflow-staging: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:187)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:375)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3358)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3407)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3375)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:486)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:834)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:159)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1166)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4368)
at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5129)
at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5103)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4352)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4315)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1344)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1284)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:376)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 30 more
Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:164)
at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137)
... 47 more
Caused by: java.net.NoRouteToHostException: No route to host (Host unreachable)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:508)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:603)
at java.base/sun.net.www.http.HttpClient.(HttpClient.java:276)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:375)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:396)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1253)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1232)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1015)
at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:116)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:87)
at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:189)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
... 50 more
        

TL;DR

Summary:

  1. Add the hadoop-aws package
  2. Have your credentials set-up in ~/.aws/credentials or EC2 roles
  3. Set the following configuration: fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
  4. Add an a to your object URIs: s3a://....
  5. Set the following configuration: fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain