Hi Team, I have a sample file in lakeFS repo and t...
# dev
v
Hi Team, I have a sample file in lakeFS repo and trying to read it from spark. I am seeing some weird error. I have used below configs. code
Copy code
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("PythonPageRank")\
        .getOrCreate()
df = spark.read.csv("<lakefs://test-repo/main/sample.csv>")
df.show()
Spark-submit
Copy code
spark-submit --conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem --conf spark.hadoop.fs.lakefs.access.key='my_key' --conf spark.hadoop.fs.lakefs.secret.key='my_sec_key' --conf spark.hadoop.fs.lakefs.endpoint='http:localhost:8000' --packages io.lakefs:hadoop-lakefs-assembly:0.1.12 main.py
error
Copy code
Caused by: io.lakefs.hadoop.shade.api.ApiException: Content type "text/html; charset=utf-8" is not supported for type: class io.lakefs.hadoop.shade.api.model.ObjectStats
        at io.lakefs.hadoop.shade.api.ApiClient.deserialize(ApiClient.java:822)
        at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1020)
        at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
        at io.lakefs.hadoop.shade.api.ObjectsApi.statObjectWithHttpInfo(ObjectsApi.java:1478)
        at io.lakefs.hadoop.shade.api.ObjectsApi.statObject(ObjectsApi.java:1451)
        at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:775)
        ... 19 more
Traceback (most recent call last):
  File "/Users/vaibhav/Desktop/project/sparksample/main.py", line 6, in <module>
    df = spark.read.json("<lakefs://test-repo/main/sample.json>")
  File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 284, in json
  File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
  File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.json.
: java.io.IOException: exists
        at io.lakefs.LakeFSFileSystem.exists(LakeFSFileSystem.java:906)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:784)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:782)
        at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1423)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1841)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: io.lakefs.hadoop.shade.api.ApiException: Content type "text/html; charset=utf-8" is not supported for type: class io.lakefs.hadoop.shade.api.model.ObjectStatsList
        at io.lakefs.hadoop.shade.api.ApiClient.deserialize(ApiClient.java:822)
        at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1020)
        at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
        at io.lakefs.hadoop.shade.api.ObjectsApi.listObjectsWithHttpInfo(ObjectsApi.java:1149)
        at io.lakefs.hadoop.shade.api.ObjectsApi.listObjects(ObjectsApi.java:1120)
        at io.lakefs.LakeFSFileSystem.exists(LakeFSFileSystem.java:873)
        ... 16 more
You need to fix your
spark.hadoop.fs.lakefs.endpoint
- http://localhost:8000/api/v1
also, while using lakeFS Hadoop FileSystem you will need to pass the credentials to s3a as the client will access the underlying storage directly.
The error you are getting is the API response was html as the endpoint was getting the UI and not the API response.
v
I have edited the spark-submit. It look like below but still getting below
unsupported URI scheme local, lakeFS FileSystem currently supports translating s3 => s3a only
Copy code
spark-submit \
--conf spark.hadoop.fs.s3a.access.key='AKIAIOSFODNN7EXAMPLE' \
--conf spark.hadoop.fs.s3a.secret.key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' \
--conf spark.hadoop.fs.s3a.endpoint=' <http://localhost:8000/api/v1>' \
--conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem   \
--conf spark.hadoop.fs.lakefs.access.key='AKIAJB5GNSKOJPOHALAQ'  --conf spark.hadoop.fs.lakefs.secret.key='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' --conf spark.hadoop.fs.lakefs.endpoint='http:localhost:8000/api/v1' --packages io.lakefs:hadoop-lakefs-assembly:0.1.12 main.py
b
wait
The spark job can work with lakeFS using s3 gateway - it is done by specify the
spark.hadoop.fs.s3a.endpoint
to point to lakefs.
Using the gateway you don't even need the lakeFS Hadoop library.
If you follow the documentation - we want the lakeFS Hadoop library to access the your underlying storage (s3)
So you want
spark.hadoop.fs.lakefs.endpoint
to point to lakeFS. The
spark.hadoop.fs.lakefs.impl
to use
io.lakefs.LakeFSFileSystem
.
The
spark.hadoop.fs.s3a.*
credentials of your s3 bucket.
The
spark.hadoop.fs.lakefs.*
credentials of your lakeFS.
v
I am not using S3 in my example. I am using Lakefs store to access the files
I have uploaded a sampe.json directly to lakefs and want to use spark over it
b
Check this ^ section and in your code use
s3a://
to access the data.
v
Following the above section I am not using the below command
spark-submit \
--conf spark.hadoop.fs.s3a.access.key='AKIAJB5GNSKOJPOHALAQ'   \
--conf spark.hadoop.fs.s3a.secret.key='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' \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint='http:localhost:8000/api/v1' --packages io.lakefs:hadoop-lakefs-assembly:0.1.12 main.py
Getting below error : org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme ā€œS3aā€
b
1. can you show the code? 2. using the s3 gateway will not use the hadoop-lakefs-assembly
If you are planning to try/use the io.lakefshadoop lakefs assembly0.1.12 as in https://docs.lakefs.io/integrations/spark.html#use-the-lakefs-hadoop-filesystem you will need to use S3.
v
Copy code
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("PythonPageRank")\
        .getOrCreate()
df = spark.read.json("<S3a://test-repo/main/sample.json>")
df.show()
b
fix the scheme from S3a -> s3a
v
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
I tried it after removing the assembly jar
b
the lakefs assembly embed hadoop aws. you need to specify the package instead
v
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: 9ee14ec7-f657-4343-9c5f-ff72564c7666; S3 Extended Request ID: B4438F5ECA8B5ABB; Proxy: null), S3 Extended Request ID: B4438F5ECA8B5ABB
after including the
--packages org.apache.spark:spark-hadoop-cloud_2.12:3.2.0
b
Your
spark.hadoop.fs.s3a.endpoint
should point to lakeFS (as it serve as S3 gateway), not the API - use
<http://localhost:8000>
.
You may also need to pass
spark.hadoop.fs.s3a.connection.ssl.enabled=false
v
it worked, Thankyou. but in order to solve https://github.com/treeverse/lakeFS/issues/2801 . I should use hadoop implementation. So, I need to have an S3 account or is there anyway I can bypass it?
b
The support and instructions are for using S3. to make it simple I'll recommend going that path.
v
Ok and even to recreate this error I think I need to use it.
b
First set up a working job that reads the data using the package. Change something in the package and verify that you submit the job with your modified version. Reproduce the issue and fix.
v
The normal s3a flow is working for me When you say package you mean hadoop implementation way?
b
yes
v
But we tried doing it earlier right and it needed S3 creds. So I will create a s3 account first and then try
b
the first part of the documentation
v
Yes for the same I need a AWS account first right
b
yes
v
ok
When put a
spark.read.parquet("<lakefs://repo-that-doesnt-exist/main/path/to/data>")
what is the first place the control comes to or what is the general flow while we read data in spark? Is it this piece of code https://github.com/treeverse/lakeFS/tree/master/clients/hadoopfs
b
One of the properties you pass to the job is
fs.lakefs.impl
set to use
io.lakefs.LakeFSFileSystem
The reader implementation will access the LakeFSFileSytem to read the content.
Any call that will try to access or query path in LakeFSFileSystem will use the lakeFS API to translate the logical address to the physical physical one.
At some point you should get to
open
but it may query stat or other calls before this one.
v
Seeing the error trace I am looking into this file does the first control after
spark.read
comes to
getFileStatus
Line 763. Please confirm?
b
Hi, @Vaibhav Kumar, don't think we need to confirm. It depends on the spark version and reader implementation. We provide file system abstraction. So as long as the lake FS is used you are on the right path.
v
I am going through the code and trying to connect the dots. And java as well is new for me. So trying to get direction.
šŸ‘ 1
b
we provide a layer to access the data. how spark use it, depends. you can check the spec tests provided to see how it can be used.
a
Hi @Vaibhav Kumar, Thanks for working on #2801! You asked:
When put a
spark.read.parquet("<lakefs://repo-that-doesnt-exist/main/path/to/data>")
what is the first place the control comes to or what is the general flow while we read data in spark?
Is it this piece of code https://github.com/treeverse/lakeFS/tree/master/clients/hadoopfs
To expand on @Barak Amarā€™s answer: LakeFSFileSystem is a Hadoop FileSystem. Spark and Hadoop perform many FileSystem operations in order to read an object, many of them appear unnecessary. So when you perform spark.read on a LakeFSFileSystem you will eventually get to some code there. But it will not necessarily be what you think: you may see calls to getFileStatus(), listFiles(), and even listStatus() and exists() before you get to open(). All of them will have to work. Fortunately, typically you get the same credentials issues for all operations, so as soon as one works the others fall in line and start working. The issue you are working on is to improve the error messages received -- because of the depth of the call stack, they are often very confusing.
v
@Ariel Shaqed (Scolnicov) Thank you for such a detailed explanation. In order to check on which line of the code I am getting the error I will put debugger. So I was going through this line by line in LakeFsfilesystem.java Is there any particular place I shall look in the code or I am going in the right direction? Please let me know if I understood your point correctly
a
Sure! I rarely use debuggers nowadays, so I am perhaps the wrong person to ask. That said, I would recommend putting a breakpoint on every function marked @Override. And at least
open
,
getFileStatus
,
listFiles
,
listStatus
.