Vaibhav Kumar
03/17/2023, 7:29 PMfrom pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("PythonPageRank")\
.getOrCreate()
df = spark.read.csv("<lakefs://test-repo/main/sample.csv>")
df.show()
Spark-submit
spark-submit --conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem --conf spark.hadoop.fs.lakefs.access.key='my_key' --conf spark.hadoop.fs.lakefs.secret.key='my_sec_key' --conf spark.hadoop.fs.lakefs.endpoint='http:localhost:8000' --packages io.lakefs:hadoop-lakefs-assembly:0.1.12 main.py
error
Caused by: io.lakefs.hadoop.shade.api.ApiException: Content type "text/html; charset=utf-8" is not supported for type: class io.lakefs.hadoop.shade.api.model.ObjectStats
at io.lakefs.hadoop.shade.api.ApiClient.deserialize(ApiClient.java:822)
at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1020)
at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
at io.lakefs.hadoop.shade.api.ObjectsApi.statObjectWithHttpInfo(ObjectsApi.java:1478)
at io.lakefs.hadoop.shade.api.ObjectsApi.statObject(ObjectsApi.java:1451)
at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:775)
... 19 more
Traceback (most recent call last):
File "/Users/vaibhav/Desktop/project/sparksample/main.py", line 6, in <module>
df = spark.read.json("<lakefs://test-repo/main/sample.json>")
File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 284, in json
File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
File "/usr/local/Cellar/apache-spark/3.3.2/libexec/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.json.
: java.io.IOException: exists
at io.lakefs.LakeFSFileSystem.exists(LakeFSFileSystem.java:906)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:784)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:782)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1423)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1841)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: io.lakefs.hadoop.shade.api.ApiException: Content type "text/html; charset=utf-8" is not supported for type: class io.lakefs.hadoop.shade.api.model.ObjectStatsList
at io.lakefs.hadoop.shade.api.ApiClient.deserialize(ApiClient.java:822)
at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1020)
at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
at io.lakefs.hadoop.shade.api.ObjectsApi.listObjectsWithHttpInfo(ObjectsApi.java:1149)
at io.lakefs.hadoop.shade.api.ObjectsApi.listObjects(ObjectsApi.java:1120)
at io.lakefs.LakeFSFileSystem.exists(LakeFSFileSystem.java:873)
... 16 more
Barak Amar
03/17/2023, 7:35 PMspark.hadoop.fs.lakefs.endpoint
- http://localhost:8000/api/v1Vaibhav Kumar
03/17/2023, 7:49 PMunsupported URI scheme local, lakeFS FileSystem currently supports translating s3 => s3a only
spark-submit \
--conf spark.hadoop.fs.s3a.access.key='AKIAIOSFODNN7EXAMPLE' \
--conf spark.hadoop.fs.s3a.secret.key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' \
--conf spark.hadoop.fs.s3a.endpoint=' <http://localhost:8000/api/v1>' \
--conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem \
--conf spark.hadoop.fs.lakefs.access.key='AKIAJB5GNSKOJPOHALAQ' --conf spark.hadoop.fs.lakefs.secret.key='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' --conf spark.hadoop.fs.lakefs.endpoint='http:localhost:8000/api/v1' --packages io.lakefs:hadoop-lakefs-assembly:0.1.12 main.py
Barak Amar
03/17/2023, 7:50 PMspark.hadoop.fs.s3a.endpoint
to point to lakefs.spark.hadoop.fs.lakefs.endpoint
to point to lakeFS.
The spark.hadoop.fs.lakefs.impl
to use io.lakefs.LakeFSFileSystem
.spark.hadoop.fs.s3a.*
credentials of your s3 bucket.spark.hadoop.fs.lakefs.*
credentials of your lakeFS.Vaibhav Kumar
03/17/2023, 7:55 PMBarak Amar
03/17/2023, 7:58 PMs3a://
to access the data.Vaibhav Kumar
03/17/2023, 8:04 PMspark-submit \
--conf spark.hadoop.fs.s3a.access.key='AKIAJB5GNSKOJPOHALAQ' \
--conf spark.hadoop.fs.s3a.secret.key='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' \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint='http:localhost:8000/api/v1' --packages io.lakefs:hadoop-lakefs-assembly:0.1.12 main.py
Barak Amar
03/17/2023, 8:06 PMVaibhav Kumar
03/17/2023, 8:09 PMfrom pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("PythonPageRank")\
.getOrCreate()
df = spark.read.json("<S3a://test-repo/main/sample.json>")
df.show()
Barak Amar
03/17/2023, 8:09 PMVaibhav Kumar
03/17/2023, 8:10 PMBarak Amar
03/17/2023, 8:11 PMVaibhav Kumar
03/17/2023, 8:17 PM--packages org.apache.spark:spark-hadoop-cloud_2.12:3.2.0
Barak Amar
03/17/2023, 8:24 PMspark.hadoop.fs.s3a.endpoint
should point to lakeFS (as it serve as S3 gateway), not the API - use <http://localhost:8000>
.spark.hadoop.fs.s3a.connection.ssl.enabled=false
Vaibhav Kumar
03/18/2023, 1:09 PMBarak Amar
03/18/2023, 1:12 PMVaibhav Kumar
03/18/2023, 1:13 PMBarak Amar
03/18/2023, 1:27 PMVaibhav Kumar
03/18/2023, 1:29 PMBarak Amar
03/18/2023, 1:29 PMVaibhav Kumar
03/18/2023, 1:30 PMBarak Amar
03/18/2023, 1:30 PMVaibhav Kumar
03/18/2023, 1:30 PMBarak Amar
03/18/2023, 1:33 PMVaibhav Kumar
03/18/2023, 1:34 PMspark.read.parquet("<lakefs://repo-that-doesnt-exist/main/path/to/data>")
what is the first place the control comes to or what is the general flow while we read data in spark?
Is it this piece of code https://github.com/treeverse/lakeFS/tree/master/clients/hadoopfsBarak Amar
03/18/2023, 1:53 PMfs.lakefs.impl
set to use io.lakefs.LakeFSFileSystem
open
but it may query stat or other calls before this one.Vaibhav Kumar
03/18/2023, 6:32 PMspark.read
comes to getFileStatus
Line 763. Please confirm?Barak Amar
03/18/2023, 7:00 PMVaibhav Kumar
03/18/2023, 7:08 PMBarak Amar
03/18/2023, 7:20 PMAriel Shaqed (Scolnicov)
03/19/2023, 7:40 AMWhen put awhat is the first place the control comes to or what is the general flow while we read data in spark?spark.read.parquet("<lakefs://repo-that-doesnt-exist/main/path/to/data>")
Is it this piece of code https://github.com/treeverse/lakeFS/tree/master/clients/hadoopfsTo expand on @Barak Amarās answer: LakeFSFileSystem is a Hadoop FileSystem. Spark and Hadoop perform many FileSystem operations in order to read an object, many of them appear unnecessary. So when you perform spark.read on a LakeFSFileSystem you will eventually get to some code there. But it will not necessarily be what you think: you may see calls to getFileStatus(), listFiles(), and even listStatus() and exists() before you get to open(). All of them will have to work. Fortunately, typically you get the same credentials issues for all operations, so as soon as one works the others fall in line and start working. The issue you are working on is to improve the error messages received -- because of the depth of the call stack, they are often very confusing.
Vaibhav Kumar
03/19/2023, 7:32 PMAriel Shaqed (Scolnicov)
03/20/2023, 10:54 AMopen
, getFileStatus
, listFiles
, listStatus
.