Hi guys. Im new to lakefs in general but I really ...
# help
e
Hi guys. Im new to lakefs in general but I really like the idea and the features so far! Investigating it for production usage in my Company. I have a minikube installation with lakefs deployed on top of azure data lake gen 2. Then I have some spark applications in python running on top of this. I tried to start using delta tables with spark and got some problems. Spark is using pre signed set up against lakefs as stated in https://docs.lakefs.io/integrations/spark.html. in other words i use the lakefs protocol for Reading and writing parquet. When using delta tables it didnt work as expected. Im using spark 3.4.0, delta core 2.4.0 jars and delta storage jars 2.4.0. Also have the pip delta-spark package version 3.4.0 installed. When reading a parquet file and writing it back in delta form i get these errors: 1) org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.lakefs.impl=null: No AbstractFileSystem configured for scheme: lakefs 2) ERRORrootAn error occurred while calling o74.save. : java.io.IOException: The error typically occurs when the default LogStore implementation, that is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system. In order to get the transactional ACID guarantees on table updates, you have to use the correct implementation of LogStore that is appropriate for your storage system. See https://docs.delta.io/latest/delta-storage.html for details. Any adviceon how to fix it?
j
Hi @Eirik Knævelsrud, Regarding your first question, can you share your Spark configurations (obscuring confidential values)? Regarding the second question, we currently don’t have a lakeFS LogStore implementation. What about trying to use the S3 compatible lakeFS endpoint on Azure?
e
sure. For deploying my applications im using the spark operator: apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: test-delta namespace: default spec: type: Python pythonVersion: "3" mode: cluster image: "test-delta:v1" imagePullPolicy: IfNotPresent mainApplicationFile: local:///app/test.py deps: jars: - "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar" - "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.69/aws-java-sdk-1.12.69.jar" - "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.12.69/aws-java-sdk-core-1.12.69.jar" - "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.12.69/aws-java-sdk-s3-1.12.69.jar" - "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.12.69/aws-java-sdk-dynamodb-1.12.69.jar" - "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.69/aws-java-sdk-bundle-1.12.69.jar" - "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-sts/1.12.69/aws-java-sdk-sts-1.12.69.jar" - "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar" - "https://repo1.maven.org/maven2/io/delta/delta-storage/2.4.0/delta-storage-2.4.0.jar" sparkVersion: "3.4.0" sparkConf: spark.driver.extraJavaOptions: "-Divy.cache.dir=/tmp -Divy.home=/tmp" spark.jars.packages: "io.lakefshadoop lakefs assembly0.1.15" restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 1 coreLimit: "1200m" memory: "4g" labels: version: 3.4.0 serviceAccount: spark executor: cores: 2 instances: 1 memory: "4g" labels: version: 3.4.0 For my python application i have this set up: pip install delta-spark==2.4.0 from pyspark.context import SparkContext from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.types import DoubleType from pyspark.sql.types import BooleanType from delta import * # Importing the required function customersTablePath = "lakefs://rep/main/table" sc=SparkSession.builder.appName("LakeFS-test").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") sc = configure_spark_with_delta_pip(sc).getOrCreate() # Configuring Spark session with Delta Lake sc._jsc.hadoopConfiguration().set("fs.lakefs.access.mode", "presigned") sc._jsc.hadoopConfiguration().set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem") sc._jsc.hadoopConfiguration().set("fs.lakefs.access.key", "key") sc._jsc.hadoopConfiguration().set("fs.lakefs.secret.key", "secret") sc._jsc.hadoopConfiguration().set("fs.lakefs.endpoint", "http://test.default:80/api/v1") try: df = sc.read.parquet('lakefs://test/main/test.parquet',header=True) df.write.format("delta").mode("overwrite").save(customersTablePath) df.show(10) except Exception as e: logging.error(e) [5:30 PM] Eirik Knævelsrud https://docs.lakefs.io/integrations/spark.html#configuring-azure-databricks-with-the-s3-compatible-api Apache Spark Accessing data in lakeFS from Apache Spark works the same as accessing S3 data from Apache Spark. So what you are saying is that this configuration using predigned and lakefs protocol is not supported for delta lake yet cos you dont have a logstore yet? So you recommend that ill test s3a protocol instead? Then I just add s3a endpoint, secret key and access key? Plus i need to add some dependent jars like hadoop-azure?
j
To test Delta with lakeFS with your current configuration I’ll need some time. In the mean time you can test it using s3a. I can see that you already have
hadoop-aws
so you’re basically good to go. Use your lakeFS server as your
s3a.endpoint
value (without the
api/v1
suffix), and use your lakeFS’s access key and secret as your
fs.s3a
access key and secret (as mentioned in the docs).
let me know if that helped
e
ok thank you. In norway its getting late now. ill message you tomorrow. Thanks for helping!
j
sure thing
e
Ok, so that worked. Thanks alot. Tho, i needed to upgrade https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar to https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.2/hadoop-aws-3.2.2.jar as i got this error: java.lang.NoSuchMethodError: 'void org.apache.hadoop.util.SemaphoredDelegatingExecutor.<init>(com.google.common.util.concurrent.ListeningExecutorService, int, boolean) (https://github.com/aws/aws-sdk-java/issues/2510 - whick is related to spark and not lakefs)
j
Yep, you have got to have your Hadoop dependencies aligned… I’m glad it worked! If you have further questions please let us know 🙏
👏 1