user
12/05/2022, 12:30 PMuser
12/05/2022, 12:59 PMio.lakefs:api-client:0.86.0
?user
12/05/2022, 1:01 PM"io.lakefs" % "hadoop-lakefs-assembly" % "0.1.9",
"io.lakefs" %% "lakefs-spark-client-312-hadoop3" % "0.5.1"
which include version 0.56.0user
12/05/2022, 1:02 PMuser
12/05/2022, 1:09 PMsbt assembly
with those settings I run out of heap space, maybe there’s something wronguser
12/05/2022, 1:10 PMuser
12/05/2022, 1:10 PMuser
12/05/2022, 1:12 PMuser
12/05/2022, 1:13 PMuser
12/05/2022, 1:13 PMuser
12/05/2022, 1:14 PMuser
12/05/2022, 1:14 PMuser
12/05/2022, 1:14 PMuser
12/05/2022, 1:15 PMuser
12/05/2022, 1:15 PMuser
12/05/2022, 1:16 PMuser
12/05/2022, 1:16 PMuser
12/05/2022, 1:17 PMuser
12/05/2022, 1:18 PMuser
12/05/2022, 1:24 PMuser
12/05/2022, 1:25 PMuser
12/05/2022, 1:27 PMuser
12/05/2022, 3:32 PMCaused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.lakefs.impl=null: No AbstractFileSystem configured for scheme: lakefs
But it is set in the sparkconfig?
("spark.hadoop.fs.lakefs.impl", "io.lakefs.LakeFSFileSystem")
user
12/05/2022, 3:35 PMspark-submit --conf ...
the key is spark.hadoop.fs.lakefs.impl
from inside the spark when you set the key is fs.lakefs.impl
.user
12/05/2022, 3:35 PMuser
12/05/2022, 3:36 PMuser
12/05/2022, 3:37 PMuser
12/05/2022, 3:37 PMuser
12/05/2022, 3:38 PMSparkSession.builder().master("local[2]").config("spark.hadoop.fs.lakefs.impl","io.lakefs.LakeFSFileSystem")
this should work as welluser
12/05/2022, 3:39 PMuser
12/05/2022, 3:41 PMuser
12/05/2022, 3:41 PMuser
12/05/2022, 3:43 PMlakefs://
, s3://
or s3a://
When I initialize the apiclient, scheme is set to lakefs
, but my namespace is set to <s3://bucket>…/mycollection
user
12/05/2022, 3:43 PMlakefs://
user
12/05/2022, 3:45 PMuser
12/05/2022, 3:45 PMuser
12/05/2022, 3:46 PMuser
12/05/2022, 3:46 PMuser
12/05/2022, 3:48 PMuser
12/05/2022, 3:48 PMspark-shell --conf spark.hadoop.fs.s3a.access.key='AKIAIOSFODNN7EXAMPLE' \
--conf spark.hadoop.fs.s3a.secret.key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' \
--conf spark.hadoop.fs.s3a.endpoint='<https://s3.eu-central-1.amazonaws.com>' \
--conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem \
--conf spark.hadoop.fs.lakefs.access.key=AKIAlakefs12345EXAMPLE \
--conf spark.hadoop.fs.lakefs.secret.key=abc/lakefs/1234567bPxRfiCYEXAMPLEKEY \
--conf spark.hadoop.fs.lakefs.endpoint=<https://lakefs.example.com/api/v1> \
--packages io.lakefs:hadoop-lakefs-assembly:0.1.8
...
user
12/05/2022, 3:48 PMuser
12/05/2022, 3:49 PMuser
12/05/2022, 3:49 PMuser
12/05/2022, 3:50 PMuser
12/05/2022, 3:50 PMuser
12/05/2022, 3:51 PMuser
12/05/2022, 3:51 PMuser
12/05/2022, 3:52 PMuser
12/05/2022, 3:52 PMuser
12/05/2022, 3:52 PMuser
12/05/2022, 4:01 PMCaused by: java.lang.ClassNotFoundException: Class io.lakefs.LakeFSFileSystem not found
user
12/05/2022, 4:01 PMuser
12/05/2022, 4:02 PMuser
12/05/2022, 4:11 PMCaused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.lakefs.impl=null: No AbstractFileSystem configured for scheme: lakefs
--conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem \
--conf spark.hadoop.fs.lakefs.access.key=XXX \
--conf "spark.hadoop.fs.lakefs.secret.key=XXX" \
--conf spark.hadoop.fs.lakefs.endpoint=<http://lakefs-int.eu-central-1.dataservices.xxx.cloud/api/v1> \
--conf spark.hadoop.lakefs.api.url=<http://lakefs-int.eu-central-1.dataservices.xxx.cloud/api/v1> \
--conf spark.hadoop.lakefs.api.access_key=XXX \
--conf "spark.hadoop.lakefs.api.secret_key=XXX" \
user
12/05/2022, 4:47 PMuser
12/05/2022, 4:48 PM/opt/entrypoint.sh ${SPARK_HOME}/bin/spark-submit \
--name {{workflow.name}} \
--deploy-mode client \
--master <k8s://https>://kubernetes.default.svc:443 \
--conf spark.kubernetes.node.selector.NodeGroup=spark-worker \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=datalake-sa \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=datalake-sa \
--conf spark.hadoop.fs.s3a.assumed.role.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider \
--conf spark.kubernetes.container.image={{inputs.parameters.spark-image}} \
--conf spark.kubernetes.driver.pod.name=${POD_NAME} \
--conf spark.kubernetes.namespace=${POD_NAMESPACE} \
--conf spark.driver.host=${POD_IP} \
--conf spark.driver.port=7077 \
--conf spark.driver.cores={{inputs.parameters.driver-cores}} \
--conf spark.driver.memory={{inputs.parameters.driver-memory}} \
--conf spark.kubernetes.driver.limit.cores={{inputs.parameters.driver-cores}} \
--conf spark.kubernetes.driver.request.cores={{inputs.parameters.driver-cores}} \
--conf spark.executor.instances={{inputs.parameters.executor-instances}} \
--conf spark.executor.memory={{inputs.parameters.executor-memory}} \
--conf spark.executor.cores={{inputs.parameters.executor-cores}} \
--conf spark.kubernetes.executor.limit.cores={{inputs.parameters.executor-cores}} \
--conf spark.kubernetes.executor.request.cores={{inputs.parameters.executor-cores}} \
--conf spark.kubernetes.executor.label.workflow-name={{workflow.name}} \
--conf spark.kubernetes.executor.deleteOnTermination=true \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.ui.retainedTasks=10 \
--conf spark.sql.ui.retainedExecutions=1 \
--conf spark.kubernetes.memoryOverheadFactor=0.2 \
--conf spark.sql.streaming.metricsEnabled=true \
--class {{inputs.parameters.spark-class}} \
--conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem \
--conf spark.hadoop.fs.lakefs.access.key=XXX \
--conf "spark.hadoop.fs.lakefs.secret.key=XXX" \
--conf spark.hadoop.fs.lakefs.endpoint=<http://lakefs-int.eu-central-1.dataservices.xxx.cloud/api/v1> \
--conf spark.hadoop.lakefs.api.url=<http://lakefs-int.eu-central-1.dataservices.xxx.cloud/api/v1> \
--conf spark.hadoop.lakefs.api.access_key=XXX \
--conf "spark.hadoop.lakefs.api.secret_key=XXX" \
local:///opt/spark/work-dir/test.jar \
{{inputs.parameters.job-arguments}}
user
12/05/2022, 4:51 PM--packages io.lakefs:hadoop-lakefs-assembly:0.1.9
user
12/05/2022, 4:52 PMuser
12/05/2022, 5:09 PMspark.hadoop.fs.lakefs.*
configurations to spark.hadoop.fs.AbstractFileSystem.lakefs.*
.
We haven't tested using the lakeFS Hadoop FileSystem using this abstraction, but I hope it all works out 🙂user
12/05/2022, 5:38 PMuser
12/05/2022, 5:39 PMuser
12/05/2022, 5:40 PMCaused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.lakefs.impl=null: No AbstractFileSystem configured for scheme: lakefs
user
12/06/2022, 8:02 AMdf.write.format('delta').save('<lakefs://myrepo/mybranch/sometable>')
Here you can find the stacktrace
```Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:245)
at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$3(TransactionalWrite.scala:342)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:297)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:245)
at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:101)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:212)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:209)
at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:101)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:317)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:221)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:159)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.ap…user
12/06/2022, 8:08 AM--jars
flag?
http://treeverse-clients-us-east.s3-website.us-east-1.amazonaws.com/hadoop/hadoop-lakefs-assembly-0.1.9.jaruser
12/06/2022, 8:16 AMCaused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
user
12/06/2022, 8:19 AMuser
12/06/2022, 8:21 AMuser
12/06/2022, 8:22 AMuser
12/06/2022, 8:23 AMMETA-INF
that should be preserved?user
12/06/2022, 8:23 AMuser
12/06/2022, 8:23 AMuser
12/06/2022, 8:23 AMuser
12/06/2022, 8:23 AMval devDependencies = Seq(
"org.apache.hadoop" % "hadoop-aws" % "3.3.2",
"org.apache.hadoop" % "hadoop-common" % "3.3.2",
"com.amazonaws" % "aws-java-sdk" % awsSDKVersion,
"com.amazonaws" % "aws-java-sdk-bundle" % awsSDKVersion,
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"org.apache.spark" %% "spark-avro" % sparkVersion,
//"org.apache.spark" %% "spark-hadoop-cloud" % sparkVersion,
"io.delta" %% "delta-core" % "2.1.0",
"io.confluent" % "kafka-schema-registry-client" % "7.2.2",
"com.lihaoyi" %% "requests" % "0.7.1",
"com.lihaoyi" %% "upickle" % "2.0.0",
"com.lihaoyi" %% "os-lib" % "0.8.1",
"com.github.scopt" %% "scopt" % "4.1.0",
"software.amazon.msk" % "aws-msk-iam-auth" % "1.1.5",
"io.lakefs" % "hadoop-lakefs-assembly" % "0.1.9",
//"io.lakefs" %% "lakefs-spark-client-312-hadoop3" % "0.5.1"
)
user
12/06/2022, 8:24 AMuser
12/06/2022, 8:26 AMuser
12/06/2022, 8:27 AMuser
12/06/2022, 8:30 AM--master local
?user
12/06/2022, 8:31 AM--master <k8s://https>://kubernetes.default.svc:443
user
12/06/2022, 8:31 AMuser
12/06/2022, 8:33 AM--deploy-mode client
?user
12/06/2022, 8:39 AMuser
12/06/2022, 8:40 AMuser
12/06/2022, 8:41 AMuser
12/06/2022, 8:48 AMException in thread "main" java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.spark.util.DependencyUtils$.resolveGlobPath(DependencyUtils.scala:317)
at org.apache.spark.util.DependencyUtils$.$anonfun$resolveGlobPaths$2(DependencyUtils.scala:273)
at org.apache.spark.util.DependencyUtils$.$anonfun$resolveGlobPaths$2$adapted(DependencyUtils.scala:271)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
at org.apache.spark.util.DependencyUtils$.resolveGlobPaths(DependencyUtils.scala:271)
at org.apache.spark.deploy.SparkSubmit.$anonfun$prepareSubmitEnvironment$4(SparkSubmit.scala:364)
at scala.Option.map(Option.scala:230)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:364)
at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:901)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
user
12/06/2022, 8:55 AMuser
12/06/2022, 8:57 AMuser
12/06/2022, 8:57 AMs3://
I get the error “unsupported filesystem s3”user
12/06/2022, 8:58 AMuser
12/06/2022, 8:58 AMuser
12/06/2022, 8:59 AMuser
12/06/2022, 8:59 AMuser
12/06/2022, 8:59 AM--jars <http://treeverse-clients-us-east.s3-website.us-east-1.amazonaws.com/hadoop/hadoop-lakefs-assembly-0.1.9.jar> \
user
12/06/2022, 9:00 AM--jars local://
might workuser
12/06/2022, 9:07 AMuser
12/06/2022, 9:08 AM22/12/06 09:01:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
Exception in thread "main" java.io.IOException: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
See <https://docs.delta.io/latest/delta-storage.html> for details.
....
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.lakefs.impl=null: No AbstractFileSystem configured for scheme: lakefs
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:177)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:266)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
at io.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.java:90)
... 94 more
user
12/06/2022, 9:22 AMspark.hadoop.fs.AbstractFileSystem.lakefs.*
parameters with the same values as your corresponding spark.hadoop.fs.lakefs.*
configurations.user
12/06/2022, 9:53 AMCaused by: java.lang.NoSuchMethodException: io.lakefs.LakeFSFileSystem.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
at java.base/java.lang.Class.getConstructor0(Unknown Source)
at java.base/java.lang.Class.getDeclaredConstructor(Unknown Source)
at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:139)
... 104 more
user
12/06/2022, 9:54 AMuser
12/06/2022, 10:00 AMuser
12/06/2022, 10:03 AMspark.delta.logStore.lakefs.impl=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
user
12/06/2022, 10:03 AMuser
12/06/2022, 10:03 AMuser
12/06/2022, 10:08 AMuser
12/06/2022, 10:08 AMuser
12/06/2022, 10:08 AMuser
12/06/2022, 10:10 AMuser
12/06/2022, 10:10 AMuser
12/06/2022, 10:23 AM