Easy way to accessing s3 buckets with spark
It's sometimes difficult to access s3 files in Apache Spark if you don't use a prebuilt environment like Zepelin, Glue Notebooks, HUE, Databriks Notebooks or other alternatives. And googling around might get you half working solutions. But do not worry, I'll show you how easy it is to do it with spark 3.3.X
1. Getting the right package
In order to access s3, a version of hadoop (yes, that think you hear about but is 10yr old and of almost no direct use anymore) greater than 3 is needed. Also, in our case we are going to get the AWS flavored one. You can visit MVN repository to get the latest version.
Another option more suited to interactive environments like a jupyter notebook is to declare it in the app config, as shown in the latest blog post: Loading packages in pyspark jupyter notebook/lab.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName("myapp") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4")\
.getOrCreate()
2. Using the correct URI
Usually s3 object identifiers start with Ā s3://
but in this case an extra a
its needed: s3a://...
. If not, the following error will arise: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3
Error stack
Traceback (most recent call last):
File "stdin", line 1, in module
File "/home/XXX/spark-3.1.1-amzn/python/pyspark/sql/readwriter.py", line 458, in parquet
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
File "/home/XXX/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/XXX/spark-3.1.1-amzn/python/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/XXX/anaconda3/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o35.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3336)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3356)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3407)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3375)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:486)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:834)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
3. Configure spark to use the ~/.aws/credentials
file
If you don't want to add your AWS key ID and secrets manually in plain text NOTE: YOU SHOULD NOT , let spark use the profiles you have configured via your AWS CLI. To do so, we need to add a last configuration (though IMO should be the default behavior).
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName("myapp") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4")\
.config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")\
.getOrCreate()
# OR, if spark is already initialised
spark.conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
If you don't do it, this nasty error will appear:
java.nio.file.AccessDeniedException: data-airflow-staging: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
Error stack
2/09/27 12:13:27 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
Traceback (most recent call last):
File "stdin", line 1, in module
File "/home/marti/spark-3.1.1-amzn/python/pyspark/sql/readwriter.py", line 458, in parquet
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
File "/home/XXX/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/XXX/spark-3.1.1-amzn/python/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/XXX/anaconda3/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o40.parquet.
: java.nio.file.AccessDeniedException: data-airflow-staging: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:187)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:375)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3358)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3407)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3375)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:486)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:834)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:159)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1166)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4368)
at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5129)
at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5103)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4352)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4315)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1344)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1284)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:376)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 30 more
Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:164)
at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137)
... 47 more
Caused by: java.net.NoRouteToHostException: No route to host (Host unreachable)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:508)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:603)
at java.base/sun.net.www.http.HttpClient.(HttpClient.java:276)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:375)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:396)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1253)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1232)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1015)
at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:116)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:87)
at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:189)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
... 50 more
TL;DR
Summary:
- Add the
hadoop-aws
package - Have your credentials set-up in
~/.aws/credentials
or EC2 roles - Set the following configuration:
fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
- Add an
a
to your object URIs:s3a://....
- Set the following configuration:
fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain