Hey all. A question on talking to S3 using the Had...
# help
u
Hey all. A question on talking to S3 using the Hadoop-LakeFS-assembly:
u
I've set up an AWS Glue job to talk to my LakeFS server running on EC2, using the S3A gateway, and that works great. So now I am trying to use the hadoop LakeFS assembly, the goal being to let Glue write directly to S3 using the LakeFS filesystem format, and not send data traffic through the LakeFS server. Here's the error I am getting when running the job:
An error occurred while calling o94.parquet. s3a://<DATA BUCKET>/data/<REPO>-<BRANCH>:a3de1af8-f7f1-4fb6-a784-72ac9c20cbb1/-nvHaNa15P3gE3S6jXr4G: getFileStatus on s3a://<DATA BUCKET>/data/<REPO>-<BRANCH>:a3fe1af8-f7f1-4fb6-aa84-72ac9c20cbb1/-nvGaNa15P4gE3S6jXr4G: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: JQJNKMESD9T3XNGE; S3 Extended Request ID: D/sNM/+LqxuX/Vm7NloIQwwY8H+bac14LaQxXnbTLGNTfPEaUhs5rOnGxgersjI6JW3t72i+KEI=; Proxy: null), S3 Extended Request ID: D/sNM/+LqxuX/Vm7NloIQwwY8H+bac14LaQxXnbTLGNTfPEaUhs5rOnGxgersjI6JW3t72i+KEI=
I believe I have configured everything correctly according to this: https://docs.lakefs.io/integrations/spark.html#configuration-1 In my case: fs.s3a.access / secret are set to an IAM user credentials which has s3* allowed on the data bucket fs.s3a.endpoint is set to a region-specific endpoint fs.lakefs.access / secret are set to what I used for the S3A gateway fs.lakefs.endpoint is set to the API endpoint on my EC2 instance. What I'm struggling with is understanding where this error is coming from. Is it S3 of LakeFS related. We don't have AWS support and I don't see any entries in Cloudwatch / Cloudtrail. I don't see any log entries in my lakefs log (it has log level DEBUG). The Glue job role also has s3* permission to the data bucket (in case). Here is the Spark code I use to attempt the write. I'm using the lakefs protocol as described in the previous link.
output = f"lakefs://{repo}/{branch}/data/sample_data_lakefs_write/"
df.write.partitionBy(["yyyy_mm_dd", "hh_mm"]).parquet(output)
I've tested with versions 0.1.4 and 0.1.6 of the assembly jar, same issue.
u
I noticed that "getFileStatus" is implemented in the Hadoop Assembly Jar and that it doesn't seem to be an S3 action I could add to IAM (and anyway I gave my IAM user s3:* permission).
u
Here is a more complete stack trace in case that helps: ```2022-03-09 091553,600 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last): File "/tmp/sample_data_lakefs_write.py", line 187, in <module> df.write.partitionBy(["yyyy_mm_dd", "hh_mm"]).parquet(output) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 839, in parquet self._jwrite.parquet(path) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o94.parquet. : java.nio.file.AccessDeniedException: s3a://<DATA BUCKET>/data/<REPO>-<BRANCH>&lt;REDACTED&gt; getFileStatus on s3a://<DATA BUCKET>/data/<REPO>-<BRANCH>&lt;REDACTED&gt; com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: <REDACTED>; S3 Extended Request ID: <REDACTED>; Proxy: null), S3 Extended Request ID: <REDACTED> at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:158) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:101) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1568) at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:810) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799) at io.lakefs.LakeFSFileSystem.createDataOutputStream(LakeFSFileSystem.java:218) at io.lakefs.LakeFSFileSystem.createDirectoryMarker(LakeFSFileSystem.java:649) at io.lakefs.LakeFSFileSystem.mkdirs(LakeFSFileSystem.java:641) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1961) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:339) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:162) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply…
u
Sorry for nitpicking on this: you mentioned an AWS IAM policy allowing
"s3*
actions (which is an invalid policy AFAICT and will not work) but also an AWS IAM policy allowing
"s3:*"
actions (which has a colon after "s3", and should therefore work). Just making sure the colon is in there...
u
Thanks for sharing the details with us @Bjorn Olsen! What Glue version are you using?
u
@Ariel Shaqed (Scolnicov) and @Tal Sofer sorry for wasting your time, I figured it out. My policy was right but I had forgotten to attach it to the user. Damn, I was sure I had checked that 🙂 At least this one was a quick fix 😄
u
No worries at all @Bjorn Olsen, happy that you are all set 🙂 Let us know if any other questions come up
u
Do you mind sharing with us the Glue version you are using?
u
Sure. Snippet from my Terraform code:
Copy code
glue_version = "2.0"
# Spark 2.4.3, Python 3.7

number_of_workers = 2
worker_type       = "Standard"
u
Thanks much @Bjorn Olsen! 🙂
u
[Hijacking this thread to advertise my favourite tool for AWS IAM] Glad to hear it's working! AWS IAM can be so annoying sometimes. The best way I know to debug AWS IAM issues is policysim.aws.amazon.com/. (GUI is probably easier than the CLI for this one...)