Title
a

Alessandro Mangone

12/05/2022, 12:30 PM
Hello, I am trying to use the Java API in a Scala codebase, and I am getting an error that was already reported by a user: https://lakefs.slack.com/archives/C02CV7MUV4G/p1666253775100359?thread_ts=1666251707.039089&cid=C02CV7MUV4G In my case I am using SBT instead of maven and don’t have any other dependencies using okhttp3. What could be causing this issue? I am using the SBT Assembly plugin to create a uber-jar, I don’t know if I need to be careful with some merging rule, but I don’t see any dependency conflict
👀 1
g

Guy Hardonag

12/05/2022, 12:59 PM
Hey @Alessandro Mangone, I’m looking into it, which package did you specify in the SBT? is it
io.lakefs:api-client:0.86.0
?
a

Alessandro Mangone

12/05/2022, 1:01 PM
I am using these libraries:
"io.lakefs" % "hadoop-lakefs-assembly" % "0.1.9",
"io.lakefs" %% "lakefs-spark-client-312-hadoop3" % "0.5.1"
which include version 0.56.0
But trying to run
sbt assembly
with those settings I run out of heap space, maybe there’s something wrong
b

Barak Amar

12/05/2022, 1:10 PM
Are you going to use the above API inside a Spark job?
a

Alessandro Mangone

12/05/2022, 1:10 PM
yes
b

Barak Amar

12/05/2022, 1:12 PM
The sharing instructions where to prevent conflict with other okhttp implementation as part of supplying jar as a library
You can try first use the client package without any share and submit the job first
a

Alessandro Mangone

12/05/2022, 1:13 PM
Ideally I’m trying to do this from inside my Spark Job:
👍 1
so you suggest to directly import the api client instead of using hadoop/spark integrations?
but even that dependency will contain okhttp3
b

Barak Amar

12/05/2022, 1:14 PM
Are you going to use the lakeFS hadoop implementation for direct upload?
if it is not the case - yes, I suggest you include the client first
There will be no share and the package and the dependencies will be used inside your spark job
I assume the job will work directly with lakeFS though the s3 gateway
a

Alessandro Mangone

12/05/2022, 1:16 PM
I am using S3 so I wanted to try the optimized lakefs upload
In such a way that lakefs only handles metadata but uploads are directly submitted to s3
b

Barak Amar

12/05/2022, 1:17 PM
I see - so can you try to import the hadoop-lakefs-assembly
👍 1
The jar will include the client and the shaded dependency
when you import the assembly the client should be shaded under io.lakefs.shaded.api
I assume that if you instance this one and call the api it should work as all the dependencies are shaded.
a

Alessandro Mangone

12/05/2022, 1:27 PM
Ok, I’ll give it a try. Some methods seem to require some extra arguments. I’ll get back as soon as I manage to assembly and test the code
After some tests I managed to configure the job, but I am getting a new error:
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.lakefs.impl=null: No AbstractFileSystem configured for scheme: lakefs
But it is set in the sparkconfig?
("spark.hadoop.fs.lakefs.impl", "io.lakefs.LakeFSFileSystem")
b

Barak Amar

12/05/2022, 3:35 PM
If you are using
spark-submit --conf ...
the key is
spark.hadoop.fs.lakefs.impl
from inside the spark when you set the key is
fs.lakefs.impl
.
a

Alessandro Mangone

12/05/2022, 3:36 PM
it depends on how you set conf keys, if you do it directly in hadoop configuration or in the configuration of the sparksession
b

Barak Amar

12/05/2022, 3:37 PM
yes
The tabs in the documentation link I posted shows some examples
a

Alessandro Mangone

12/05/2022, 3:38 PM
SparkSession.builder().master("local[2]").config("spark.hadoop.fs.lakefs.impl","io.lakefs.LakeFSFileSystem")
this should work as well
but I’ll try the other way with hadoopConfiguration
b

Barak Amar

12/05/2022, 3:41 PM
It will probably do the same - I wasn't sure what is 'sparkconfig' for you.
So the question is why the spark cluster doesn't get these properties
a

Alessandro Mangone

12/05/2022, 3:43 PM
or maybe I should also ask if I’m implementing the right way, because it is a little bit confusing when I should use
lakefs://
,
s3://
or
s3a://
When I initialize the apiclient, scheme is set to
lakefs
, but my namespace is set to
<s3://bucket>…/mycollection
and then when I use spark read/write, i use
lakefs://
b

Barak Amar

12/05/2022, 3:45 PM
The api client works with http - so it will get https://<lakefs endpoint>
The s3 and s3a should be configured to work directly with S3 and have cred to access the lakefs bucket
The lakefs:// should be used while read/write data from spark - it will use the io.lakefs.LakeFSFileSystem to get the metadata from lakefs and s3a for the data
so the first test I'll write is just write some data (csv/parquet) to lakefs using lakefs://<repo>/<branch>/location
lakefs:// should work only if we set the credentials and endpoint (spark.hadoop.fs.lakefs.endpoint=https://lakefs.example.com/api/v1)
spark-shell --conf spark.hadoop.fs.s3a.access.key='AKIAIOSFODNN7EXAMPLE' \
              --conf spark.hadoop.fs.s3a.secret.key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' \
              --conf spark.hadoop.fs.s3a.endpoint='<https://s3.eu-central-1.amazonaws.com>' \
              --conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem \
              --conf spark.hadoop.fs.lakefs.access.key=AKIAlakefs12345EXAMPLE \
              --conf spark.hadoop.fs.lakefs.secret.key=abc/lakefs/1234567bPxRfiCYEXAMPLEKEY \
              --conf spark.hadoop.fs.lakefs.endpoint=<https://lakefs.example.com/api/v1> \
              --packages io.lakefs:hadoop-lakefs-assembly:0.1.8
              ...
something like the example should work
a

Alessandro Mangone

12/05/2022, 3:49 PM
yes, this works quite well with your playground, but I am struggling to make it work with AWS because of the lakefs filesystem 🙂
and in aws you have so many different prefixes
b

Barak Amar

12/05/2022, 3:50 PM
the lakefs:// implementation uses s3a:// addressing for read/write the data
you can drop the 'spark.hadoop.fs.s3a.endpoint' form the above example - it will use the default
from the last error you posted it looks like the property we set wasn't passed to the location we run the code.
a

Alessandro Mangone

12/05/2022, 3:51 PM
yes I suspect that too
b

Barak Amar

12/05/2022, 3:52 PM
can you try to submit your code with spark submit?
a

Alessandro Mangone

12/05/2022, 3:52 PM
I am recompiling with config setup using hadoopConfiguration
and will submit asap
👍 1
now I am getting a new error:
Caused by: java.lang.ClassNotFoundException: Class io.lakefs.LakeFSFileSystem not found
b

Barak Amar

12/05/2022, 4:01 PM
can we apply these settings on the cluster level or using spark-submit?
a

Alessandro Mangone

12/05/2022, 4:02 PM
I’ll try with the spark submit
with spark submit I’m getting the previous error:
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.lakefs.impl=null: No AbstractFileSystem configured for scheme: lakefs
--conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem \
--conf spark.hadoop.fs.lakefs.access.key=XXX \
--conf "spark.hadoop.fs.lakefs.secret.key=XXX" \
--conf spark.hadoop.fs.lakefs.endpoint=<http://lakefs-int.eu-central-1.dataservices.xxx.cloud/api/v1> \
--conf spark.hadoop.lakefs.api.url=<http://lakefs-int.eu-central-1.dataservices.xxx.cloud/api/v1> \
--conf spark.hadoop.lakefs.api.access_key=XXX \
--conf "spark.hadoop.lakefs.api.secret_key=XXX" \
b

Barak Amar

12/05/2022, 4:47 PM
Looks like the same issue as the cluster doesn't location the implementation - can you paste the complete spark-submit command (without secrets)?
a

Alessandro Mangone

12/05/2022, 4:48 PM
/opt/entrypoint.sh ${SPARK_HOME}/bin/spark-submit \
  --name {{workflow.name}} \
  --deploy-mode client \
  --master <k8s://https>://kubernetes.default.svc:443 \
  --conf spark.kubernetes.node.selector.NodeGroup=spark-worker \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=datalake-sa \
  --conf spark.kubernetes.authenticate.executor.serviceAccountName=datalake-sa \
  --conf spark.hadoop.fs.s3a.assumed.role.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider \
  --conf spark.kubernetes.container.image={{inputs.parameters.spark-image}} \
  --conf spark.kubernetes.driver.pod.name=${POD_NAME} \
  --conf spark.kubernetes.namespace=${POD_NAMESPACE} \
  --conf spark.driver.host=${POD_IP} \
  --conf spark.driver.port=7077 \
  --conf spark.driver.cores={{inputs.parameters.driver-cores}} \
  --conf spark.driver.memory={{inputs.parameters.driver-memory}} \
  --conf spark.kubernetes.driver.limit.cores={{inputs.parameters.driver-cores}} \
  --conf spark.kubernetes.driver.request.cores={{inputs.parameters.driver-cores}} \
  --conf spark.executor.instances={{inputs.parameters.executor-instances}} \
  --conf spark.executor.memory={{inputs.parameters.executor-memory}} \
  --conf spark.executor.cores={{inputs.parameters.executor-cores}} \
  --conf spark.kubernetes.executor.limit.cores={{inputs.parameters.executor-cores}} \
  --conf spark.kubernetes.executor.request.cores={{inputs.parameters.executor-cores}} \
  --conf spark.kubernetes.executor.label.workflow-name={{workflow.name}} \
  --conf spark.kubernetes.executor.deleteOnTermination=true \
  --conf spark.sql.autoBroadcastJoinThreshold=-1 \
  --conf spark.ui.retainedTasks=10 \
  --conf spark.sql.ui.retainedExecutions=1 \
  --conf spark.kubernetes.memoryOverheadFactor=0.2 \
  --conf spark.sql.streaming.metricsEnabled=true \
  --class {{inputs.parameters.spark-class}} \
  --conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem \
  --conf spark.hadoop.fs.lakefs.access.key=XXX \
  --conf "spark.hadoop.fs.lakefs.secret.key=XXX" \
  --conf spark.hadoop.fs.lakefs.endpoint=<http://lakefs-int.eu-central-1.dataservices.xxx.cloud/api/v1> \
  --conf spark.hadoop.lakefs.api.url=<http://lakefs-int.eu-central-1.dataservices.xxx.cloud/api/v1> \
  --conf spark.hadoop.lakefs.api.access_key=XXX \
  --conf "spark.hadoop.lakefs.api.secret_key=XXX" \
  local:///opt/spark/work-dir/test.jar \
  {{inputs.parameters.job-arguments}}
b

Barak Amar

12/05/2022, 4:51 PM
test jar is uber jar or do we need also to pass ?
--packages io.lakefs:hadoop-lakefs-assembly:0.1.9
a

Alessandro Mangone

12/05/2022, 4:52 PM
it’s the uber jar
y

Yoni Augarten

12/05/2022, 5:09 PM
Hey @Alessandro Mangone, are you accessing the FileSystem using the FileContext abstraction? If so, you would need to change all
spark.hadoop.fs.lakefs.*
configurations to
spark.hadoop.fs.AbstractFileSystem.lakefs.*
. We haven't tested using the lakeFS Hadoop FileSystem using this abstraction, but I hope it all works out 🙂
a

Alessandro Mangone

12/05/2022, 5:38 PM
no i am not , I am configuring how I pasted here - same as in your tutorials - but tomorrow I’ll perform some additional tests
y

Yoni Augarten

12/05/2022, 5:39 PM
I'm asking about the code accessing the FileSystem, not the configurations. Can you share the code?
Alternatively, the full stacktrace leading to:
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.lakefs.impl=null: No AbstractFileSystem configured for scheme: lakefs
a

Alessandro Mangone

12/06/2022, 8:02 AM
I am not doing anything strange with the filesystem except configuring it in the spark config and then using it to write some data as in
df.write.format('delta').save('<lakefs://myrepo/mybranch/sometable>')
Here you can find the stacktrace
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:245)
        at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$3(TransactionalWrite.scala:342)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:297)
        at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:245)
        at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:101)
        at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:212)
        at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:209)
        at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:101)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:317)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91)
        at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:221)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91)
        at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:159)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at <http://org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org|org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org>$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at cloud.docebo.dataservices.LakeFSMain$.$anonfun$main$1(LakeFSMain.scala:40)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at sparktest.LakeFSMain$.main(LakeFSMain.scala:29)
        at sparktest.LakeFSMain.main(LakeFSMain.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.lakefs.LakeFSFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.parquet.hadoop.util.HadoopOutputFile.fromPath(HadoopOutputFile.java:58)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:480)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:420)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:409)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:155)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:317)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: Class io.lakefs.LakeFSFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
        ... 25 more
y

Yoni Augarten

12/06/2022, 8:08 AM
Hey @Alessandro Mangone, thank you for the details! This seems like a different error, and it means that the uber-JAR is not properly loaded into the classpath. Could you please try to download it from the following link, and include it in your submit command using the
--jars
flag? http://treeverse-clients-us-east.s3-website.us-east-1.amazonaws.com/hadoop/hadoop-lakefs-assembly-0.1.9.jar
a

Alessandro Mangone

12/06/2022, 8:16 AM
I definitely need to review my assembly strategy, because by adding it with the --jars flag I get new nonsense errors 😐
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
y

Yoni Augarten

12/06/2022, 8:19 AM
The lakeFS FileSystem relies on having hadoop-aws in your classpath. We don't include it in our Uber-jar since it is specific for your Hadoop version.
a

Alessandro Mangone

12/06/2022, 8:21 AM
but it’s there 🙂 I am already using this codebase without any problem and I’m trying to integrate lakefs
I’ll review how I create the uberjar
is there maybe any file in
META-INF
that should be preserved?
y

Yoni Augarten

12/06/2022, 8:23 AM
You're creating an uber-jar for your project which includes the lakeFS FileSystem as a dependency?
a

Alessandro Mangone

12/06/2022, 8:23 AM
yes
y

Yoni Augarten

12/06/2022, 8:23 AM
Perhaps you are shading hadoop-aws?
a

Alessandro Mangone

12/06/2022, 8:23 AM
val devDependencies = Seq(
  "org.apache.hadoop" % "hadoop-aws" % "3.3.2",
  "org.apache.hadoop" % "hadoop-common" % "3.3.2",
  "com.amazonaws" % "aws-java-sdk" % awsSDKVersion,
  "com.amazonaws" % "aws-java-sdk-bundle" % awsSDKVersion,
  "org.apache.spark" %% "spark-core" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
  "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
  "org.apache.spark" %% "spark-avro" % sparkVersion,
  //"org.apache.spark" %% "spark-hadoop-cloud" % sparkVersion,
  "io.delta" %% "delta-core" % "2.1.0",
  "io.confluent" % "kafka-schema-registry-client" % "7.2.2",
  "com.lihaoyi" %% "requests" % "0.7.1",
  "com.lihaoyi" %% "upickle" % "2.0.0",
  "com.lihaoyi" %% "os-lib" % "0.8.1",
  "com.github.scopt" %% "scopt" % "4.1.0",
  "software.amazon.msk" % "aws-msk-iam-auth" % "1.1.5",
  "io.lakefs" % "hadoop-lakefs-assembly" % "0.1.9",
  //"io.lakefs" %% "lakefs-spark-client-312-hadoop3" % "0.5.1"
)
no not really, I don’t have any shade rules in place
y

Yoni Augarten

12/06/2022, 8:26 AM
Can you check that hadoop-aws is indeed included in your final jar?
I agree that it should be, according to the dependencies.
a

Alessandro Mangone

12/06/2022, 8:29 AM
yes I can confirm it’s there
y

Yoni Augarten

12/06/2022, 8:30 AM
And you're running spark-submit with
--master local
?
a

Alessandro Mangone

12/06/2022, 8:31 AM
no, I’m running spark on k8s, so
--master <k8s://https>://kubernetes.default.svc:443
y

Yoni Augarten

12/06/2022, 8:31 AM
Right, sorry, I misread the thread. Thank you for confirming. I would like to take some time to try and reproduce this. I will get back to you here with an answer.
By the way, can you explain why you are using
--deploy-mode client
?
a

Alessandro Mangone

12/06/2022, 8:39 AM
to have the driver running in the pod itself
y

Yoni Augarten

12/06/2022, 8:40 AM
Can you please share the stacktrace leading to the error?
I want to determine whether it happens on the driver or workers
a

Alessandro Mangone

12/06/2022, 8:48 AM
Exception in thread "main" java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.spark.util.DependencyUtils$.resolveGlobPath(DependencyUtils.scala:317)
        at org.apache.spark.util.DependencyUtils$.$anonfun$resolveGlobPaths$2(DependencyUtils.scala:273)
        at org.apache.spark.util.DependencyUtils$.$anonfun$resolveGlobPaths$2$adapted(DependencyUtils.scala:271)
        at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
        at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
        at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
        at org.apache.spark.util.DependencyUtils$.resolveGlobPaths(DependencyUtils.scala:271)
        at org.apache.spark.deploy.SparkSubmit.$anonfun$prepareSubmitEnvironment$4(SparkSubmit.scala:364)
        at scala.Option.map(Option.scala:230)
        at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:364)
        at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:901)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
y

Yoni Augarten

12/06/2022, 8:55 AM
It seems like a failure while preparing the submit environment. Are you providing any of the spark-submit arguments as paths under s3a?
a

Alessandro Mangone

12/06/2022, 8:57 AM
yes, the jar of lakefs as you suggested, I’m trying to load it from s3
If I use a path like
s3://
I get the error “unsupported filesystem s3”
y

Yoni Augarten

12/06/2022, 8:58 AM
For now, let's try to download it to your local machine and include it as a local path
Once we get it working we will solve the download thing
a

Alessandro Mangone

12/06/2022, 8:59 AM
actually
I tried using the http link and it works now
👍🏻 1
--jars <http://treeverse-clients-us-east.s3-website.us-east-1.amazonaws.com/hadoop/hadoop-lakefs-assembly-0.1.9.jar> \
so pre-loading it inside my docker image and using
--jars local://
might work
actually, it failed again complaining about Delta writing LogStore on a non-hadoop filesystem
22/12/06 09:01:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
Exception in thread "main" java.io.IOException: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
See <https://docs.delta.io/latest/delta-storage.html> for details.

....

Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.lakefs.impl=null: No AbstractFileSystem configured for scheme: lakefs
        at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:177)
        at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:266)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/javax.security.auth.Subject.doAs(Unknown Source)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
        at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
        at io.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.java:90)
        ... 94 more
y

Yoni Augarten

12/06/2022, 9:22 AM
It seems that this version of delta does use the FileContext abstraction I mentioned before. I haven't run into this before. You will need to add
spark.hadoop.fs.AbstractFileSystem.lakefs.*
parameters with the same values as your corresponding
spark.hadoop.fs.lakefs.*
configurations.
a

Alessandro Mangone

12/06/2022, 9:53 AM
Sorry it took me this long to retry, but after configuring also the AbstractFileSystem properties I get a new error
Caused by: java.lang.NoSuchMethodException: io.lakefs.LakeFSFileSystem.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
        at java.base/java.lang.Class.getConstructor0(Unknown Source)
        at java.base/java.lang.Class.getDeclaredConstructor(Unknown Source)
        at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:139)
        ... 104 more
If that matters, I am using the 2.1.0 open-source version of Delta
y

Yoni Augarten

12/06/2022, 10:00 AM
It seems like Delta uses this FileContext thing in order to achieve atomicity over HDFS, when writing the delta logstore. Let's try to use another logstore.
Can you try to add the following configuration:
spark.delta.logStore.lakefs.impl=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
If it doesn't work, I'll try to spin up a cluster with open source delta on my end
a

Alessandro Mangone

12/06/2022, 10:03 AM
Will do, I was looking at the same docs page 🙂
👍🏻 1
Thanks so much! It is working now!
y

Yoni Augarten

12/06/2022, 10:08 AM
We've come a long way 🙂
You're welcome
a

Alessandro Mangone

12/06/2022, 10:10 AM
I think this should be added in the docs as additional setting when lakefs hadoop is used with delta ?
y

Yoni Augarten

12/06/2022, 10:10 AM
Absolutely, I will open an issue for this
👍 1
a

Alessandro Mangone

12/06/2022, 10:23 AM
I removed all the additional --jars/--conf, etc. and can confirm it’s not a problem of uberjar. Adding the logstore implementation is sufficient