https://lakefs.io/ logo
Title
v

Vaibhav Kumar

03/17/2023, 7:29 PM
Hi Team, I have a sample file in lakeFS repo and trying to read it from spark. I am seeing some weird error. I have used below configs. code
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("PythonPageRank")\
        .getOrCreate()
df = spark.read.csv("<lakefs://test-repo/main/sample.csv>")
df.show()
Spark-submit
spark-submit --conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem --conf spark.hadoop.fs.lakefs.access.key='my_key' --conf spark.hadoop.fs.lakefs.secret.key='my_sec_key' --conf spark.hadoop.fs.lakefs.endpoint='http:localhost:8000' --packages io.lakefs:hadoop-lakefs-assembly:0.1.12 main.py
error
Caused by: io.lakefs.hadoop.shade.api.ApiException: Content type "text/html; charset=utf-8" is not supported for type: class io.lakefs.hadoop.shade.api.model.ObjectStats
        at io.lakefs.hadoop.shade.api.ApiClient.deserialize(ApiClient.java:822)
        at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1020)
        at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
        at io.lakefs.hadoop.shade.api.ObjectsApi.statObjectWithHttpInfo(ObjectsApi.java:1478)
        at io.lakefs.hadoop.shade.api.ObjectsApi.statObject(ObjectsApi.java:1451)
        at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:775)
        ... 19 more
Traceback (most recent call last):
  File "/Users/vaibhav/Desktop/project/sparksample/main.py", line 6, in <module>
    df = spark.read.json("<lakefs://test-repo/main/sample.json>")
  File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 284, in json
  File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
  File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.json.
: java.io.IOException: exists
        at io.lakefs.LakeFSFileSystem.exists(LakeFSFileSystem.java:906)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:784)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:782)
        at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1423)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1841)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: io.lakefs.hadoop.shade.api.ApiException: Content type "text/html; charset=utf-8" is not supported for type: class io.lakefs.hadoop.shade.api.model.ObjectStatsList
        at io.lakefs.hadoop.shade.api.ApiClient.deserialize(ApiClient.java:822)
        at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1020)
        at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
        at io.lakefs.hadoop.shade.api.ObjectsApi.listObjectsWithHttpInfo(ObjectsApi.java:1149)
        at io.lakefs.hadoop.shade.api.ObjectsApi.listObjects(ObjectsApi.java:1120)
        at io.lakefs.LakeFSFileSystem.exists(LakeFSFileSystem.java:873)
        ... 16 more
You need to fix your
spark.hadoop.fs.lakefs.endpoint
- http://localhost:8000/api/v1
also, while using lakeFS Hadoop FileSystem you will need to pass the credentials to s3a as the client will access the underlying storage directly.
The error you are getting is the API response was html as the endpoint was getting the UI and not the API response.
v

Vaibhav Kumar

03/17/2023, 7:49 PM
I have edited the spark-submit. It look like below but still getting below
unsupported URI scheme local, lakeFS FileSystem currently supports translating s3 => s3a only
spark-submit \
--conf spark.hadoop.fs.s3a.access.key='AKIAIOSFODNN7EXAMPLE' \
--conf spark.hadoop.fs.s3a.secret.key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' \
--conf spark.hadoop.fs.s3a.endpoint=' <http://localhost:8000/api/v1>' \
--conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem   \
--conf spark.hadoop.fs.lakefs.access.key='AKIAJB5GNSKOJPOHALAQ'  --conf spark.hadoop.fs.lakefs.secret.key='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' --conf spark.hadoop.fs.lakefs.endpoint='http:localhost:8000/api/v1' --packages io.lakefs:hadoop-lakefs-assembly:0.1.12 main.py
b

Barak Amar

03/17/2023, 7:50 PM
wait
The spark job can work with lakeFS using s3 gateway - it is done by specify the
spark.hadoop.fs.s3a.endpoint
to point to lakefs.
Using the gateway you don't even need the lakeFS Hadoop library.
If you follow the documentation - we want the lakeFS Hadoop library to access the your underlying storage (s3)
So you want
spark.hadoop.fs.lakefs.endpoint
to point to lakeFS. The
spark.hadoop.fs.lakefs.impl
to use
io.lakefs.LakeFSFileSystem
.
The
spark.hadoop.fs.s3a.*
credentials of your s3 bucket.
The
spark.hadoop.fs.lakefs.*
credentials of your lakeFS.
v

Vaibhav Kumar

03/17/2023, 7:55 PM
I am not using S3 in my example. I am using Lakefs store to access the files
I have uploaded a sampe.json directly to lakefs and want to use spark over it
b

Barak Amar

03/17/2023, 7:58 PM
Check this ^ section and in your code use
s3a://
to access the data.
v

Vaibhav Kumar

03/17/2023, 8:04 PM
Following the above section I am not using the below command
spark-submit \
--conf spark.hadoop.fs.s3a.access.key='AKIAJB5GNSKOJPOHALAQ'   \
--conf spark.hadoop.fs.s3a.secret.key='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' \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint='http:localhost:8000/api/v1' --packages io.lakefs:hadoop-lakefs-assembly:0.1.12 main.py
Getting below error : org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme ā€œS3aā€
b

Barak Amar

03/17/2023, 8:06 PM
1. can you show the code? 2. using the s3 gateway will not use the hadoop-lakefs-assembly
If you are planning to try/use the io.lakefs:hadoop-lakefs-assembly:0.1.12 as in https://docs.lakefs.io/integrations/spark.html#use-the-lakefs-hadoop-filesystem you will need to use S3.
v

Vaibhav Kumar

03/17/2023, 8:09 PM
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("PythonPageRank")\
        .getOrCreate()
df = spark.read.json("<S3a://test-repo/main/sample.json>")
df.show()
b

Barak Amar

03/17/2023, 8:09 PM
fix the scheme from S3a -> s3a
v

Vaibhav Kumar

03/17/2023, 8:10 PM
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
I tried it after removing the assembly jar
b

Barak Amar

03/17/2023, 8:11 PM
the lakefs assembly embed hadoop aws. you need to specify the package instead
v

Vaibhav Kumar

03/17/2023, 8:17 PM
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: 9ee14ec7-f657-4343-9c5f-ff72564c7666; S3 Extended Request ID: B4438F5ECA8B5ABB; Proxy: null), S3 Extended Request ID: B4438F5ECA8B5ABB
after including the
--packages org.apache.spark:spark-hadoop-cloud_2.12:3.2.0
b

Barak Amar

03/17/2023, 8:24 PM
Your
spark.hadoop.fs.s3a.endpoint
should point to lakeFS (as it serve as S3 gateway), not the API - use
<http://localhost:8000>
.
You may also need to pass
spark.hadoop.fs.s3a.connection.ssl.enabled=false
v

Vaibhav Kumar

03/18/2023, 1:09 PM
it worked, Thankyou. but in order to solve https://github.com/treeverse/lakeFS/issues/2801 . I should use hadoop implementation. So, I need to have an S3 account or is there anyway I can bypass it?
b

Barak Amar

03/18/2023, 1:12 PM
The support and instructions are for using S3. to make it simple I'll recommend going that path.
v

Vaibhav Kumar

03/18/2023, 1:13 PM
Ok and even to recreate this error I think I need to use it.
b

Barak Amar

03/18/2023, 1:27 PM
First set up a working job that reads the data using the package. Change something in the package and verify that you submit the job with your modified version. Reproduce the issue and fix.
v

Vaibhav Kumar

03/18/2023, 1:29 PM
The normal s3a flow is working for me When you say package you mean hadoop implementation way?
b

Barak Amar

03/18/2023, 1:29 PM
yes
v

Vaibhav Kumar

03/18/2023, 1:30 PM
But we tried doing it earlier right and it needed S3 creds. So I will create a s3 account first and then try
b

Barak Amar

03/18/2023, 1:30 PM
the first part of the documentation
v

Vaibhav Kumar

03/18/2023, 1:30 PM
Yes for the same I need a AWS account first right
b

Barak Amar

03/18/2023, 1:33 PM
yes
v

Vaibhav Kumar

03/18/2023, 1:34 PM
ok
When put a
spark.read.parquet("<lakefs://repo-that-doesnt-exist/main/path/to/data>")
what is the first place the control comes to or what is the general flow while we read data in spark? Is it this piece of code https://github.com/treeverse/lakeFS/tree/master/clients/hadoopfs
b

Barak Amar

03/18/2023, 1:53 PM
One of the properties you pass to the job is
fs.lakefs.impl
set to use
io.lakefs.LakeFSFileSystem
The reader implementation will access the LakeFSFileSytem to read the content.
Any call that will try to access or query path in LakeFSFileSystem will use the lakeFS API to translate the logical address to the physical physical one.
At some point you should get to
open
but it may query stat or other calls before this one.
v

Vaibhav Kumar

03/18/2023, 6:32 PM
Seeing the error trace I am looking into this file does the first control after
spark.read
comes to
getFileStatus
Line 763. Please confirm?
b

Barak Amar

03/18/2023, 7:00 PM
Hi, @Vaibhav Kumar, don't think we need to confirm. It depends on the spark version and reader implementation. We provide file system abstraction. So as long as the lake FS is used you are on the right path.
v

Vaibhav Kumar

03/18/2023, 7:08 PM
I am going through the code and trying to connect the dots. And java as well is new for me. So trying to get direction.
šŸ‘ 1
b

Barak Amar

03/18/2023, 7:20 PM
we provide a layer to access the data. how spark use it, depends. you can check the spec tests provided to see how it can be used.
a

Ariel Shaqed (Scolnicov)

03/19/2023, 7:40 AM
Hi @Vaibhav Kumar, Thanks for working on #2801! You asked:
When put a
spark.read.parquet("<lakefs://repo-that-doesnt-exist/main/path/to/data>")
what is the first place the control comes to or what is the general flow while we read data in spark?
Is it this piece of code https://github.com/treeverse/lakeFS/tree/master/clients/hadoopfs
To expand on @Barak Amar’s answer: LakeFSFileSystem is a Hadoop FileSystem. Spark and Hadoop perform many FileSystem operations in order to read an object, many of them appear unnecessary. So when you perform spark.read on a LakeFSFileSystem you will eventually get to some code there. But it will not necessarily be what you think: you may see calls to getFileStatus(), listFiles(), and even listStatus() and exists() before you get to open(). All of them will have to work. Fortunately, typically you get the same credentials issues for all operations, so as soon as one works the others fall in line and start working. The issue you are working on is to improve the error messages received -- because of the depth of the call stack, they are often very confusing.
v

Vaibhav Kumar

03/19/2023, 7:32 PM
@Ariel Shaqed (Scolnicov) Thank you for such a detailed explanation. In order to check on which line of the code I am getting the error I will put debugger. So I was going through this line by line in LakeFsfilesystem.java Is there any particular place I shall look in the code or I am going in the right direction? Please let me know if I understood your point correctly
a

Ariel Shaqed (Scolnicov)

03/20/2023, 10:54 AM
Sure! I rarely use debuggers nowadays, so I am perhaps the wrong person to ask. That said, I would recommend putting a breakpoint on every function marked @Override. And at least
open
,
getFileStatus
,
listFiles
,
listStatus
.