Hi again guys! Really like lakefs. I have a probl...
# help
e
Hi again guys! Really like lakefs. I have a problem regarding reading and writing delta tables using spark. It works but spark is huge and its complex and i will try everything to avoid it. So my question to you guys is if there are any other way to read and write delta tables to lakefs avoiding spark?
i
Hi @Eirik Knævelsrud, welcome to the lake 👋 jumping lakefs Delta tables can be written to lakeFS regardless of the compute engine (Spark in this case). How do you normally read/write delta tables without Spark? Have you tried writing them to lakeFS in the same manner? Could you expand some more on your use-case?
r
there are a bunch of integrations listed here that you could use https://delta.io/integrations
e
Ok thank you for fast response. Ill do it the spark way now using this code: sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "access key") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "secret key") sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://example-lakefs.default:80") sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") df = sc.read.parquet(parquet_path, header=True) df.write.format("delta").mode("overwrite").save(delta_save_path) So im using python at this point and will continue with that. Im looking for the most light way approach of doing this logic. Thanks for the link of integrations. the most lightway i can see is this: https://github.com/delta-io/delta-rs . Any experience? (Or other approaces)
l
As Itai mentioned, it would be great if you can elaborate a bit so we can fully understand and help with your use-case 🙂 How do you currently using delta tables (regardless of lakeFS)? And do you mean you are using pyspark? What do you mean looking for the most light way approach of doing this logic?
e
To start. Im totally new to this so dont shoot me if its a stupid question. Yes im using pyspark. I use pyspark to read the parquet and to write that df as a deltatable. Im having azure data lake as foundation and lakefs on top of this. So my question is. How can i do this without spark or any other havier systems? My desired approach would be something like: 1. pip isntall python package 2. Set up connection to lakefs without spark (Use boto3?) 3. pythonPackage.readParquet 4. pythonpackage.writeAsDelta
o
This could be useful (using Delta-rs with lakeFS' S3 Gateway):
Copy code
from deltalake import DeltaTable

# Replace with your lakeFS configuration and credentials
lakefs = {
    "AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE",
    "AWS_SECRET_ACCESS_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    "AWS_ENDPOINT_URL": "<https://my.lakefs.installation.com>", 
    "AWS_REGION": "us-east-1"
}

dt = DeltaTable("<s3://my-repo/branch/path/to/delta_table/>", storage_options=lakefs)
df = dt.to_pandas()
e
yes @Oz Katz. im looking for something like this. For now im just have lakefs installed on a local minikube environment with port forward on lakefs service. In other words hosting lakefs locally. How is the auth info then?
o
Probably the same values you used for Spark (access key secret and endpoint). the region is likely
us-east-1
unless you explicitly changed the s3 gateway configuration in your lakeFS server
r
I'm trying to build a notebook for this to go in https://github.com/treeverse/lakeFS-samples/ but can't get it to work…
Copy code
storage_options = {"AWS_ACCESS_KEY_ID": lakefsAccessKey, 
                   "AWS_SECRET_ACCESS_KEY":lakefsSecretKey,
                   "AWS_ENDPOINT": lakefsEndPoint,
                   "AWS_STORAGE_ALLOW_HTTP": "true"
                  }
Copy code
{'AWS_ACCESS_KEY_ID': 'AKIAIOSFOLKFSSAMPLES',
 'AWS_SECRET_ACCESS_KEY': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
 'AWS_ENDPOINT': '<http://lakefs:8000>',
 'AWS_STORAGE_ALLOW_HTTP': 'true'}
Copy code
deltalake.write_deltalake(table_or_uri='<s3a://example/main/userdata/>', 
                          data = dataframe,
                          storage_options=storage_options)
Fails with
Copy code
Generic S3 error: response error "<?xml version="1.0" encoding="UTF-8"?>
<Error><Code>MissingFields</Code><Message>Missing fields in request.</Message><Resource></Resource><Region>us-east-1</Region><RequestId>fc5ae35f-aaa6-4891-b0e4-715b574c894a</RequestId><HostId>79C9987FC956D83E</HostId></Error>", after 0 retries: HTTP status client error (400 Bad Request) for url (<http://lakefs:8000/example/main/userdata/_delta_log/_last_checkpoint>)
in the lakeFS log is
Copy code
level=error msg="log header does not match v2 structure" func="pkg/gateway/sig.(*V2SigAuthenticator).Parse" file="build/pkg/gateway/sig/v2.go:94" header=Authorization

level=warning msg="failed to parse signature" func=pkg/gateway.AuthenticationHandler.func1 file="build/pkg/gateway/middleware.go:41" error=MissingFields
any ideas?
o
I wonder why sigv2 is even getting used here. Which version of Python / Delta-rs / lakeFS are you using? Also, you probably need to set
AWS_REGION
for this to work.
Where is
AWS_STORAGE_ALLOW_HTTP
coming from? I'm not familiar with that config value..
r
Copy code
Python version 3.10.10 | packaged by conda-forge | (main, Mar 24 2023, 19:56:21) [GCC 11.3.0] 
deltalake version 0.10.1
lakeFS version 0.106.0
AWS_STORAGE_ALLOW_HTTP
is buried in the S3 crate documentation, I stumbled across it in a delta-rs GH issue
okay this worked:
Copy code
storage_options = {"AWS_ACCESS_KEY_ID": lakefsAccessKey, 
                   "AWS_SECRET_ACCESS_KEY":lakefsSecretKey,
                   "AWS_ENDPOINT": lakefsEndPoint,
                   "AWS_REGION": "us-east-1",
                   "AWS_STORAGE_ALLOW_HTTP": "true",
                   "AWS_S3_ALLOW_UNSAFE_RENAME": "true"
                  }
@Eirik Knævelsrud you can see working code example here: https://github.com/treeverse/lakeFS-samples/blob/428128b71c733efb4f586a8130db829fb44a1f09/00_notebooks/delta-lake-python.ipynb Let me know if this is different from what you're trying to do
lakefs unicorn 1
e
Ok thank you very much! Ill check it tomorrow morning and will give gove you an update :) Atleast its this way i wanted
got this error: Traceback (most recent call last): File "pathToPythonFile\test2.py", line 61, in <module> deltalake.write_deltalake(table_or_uri='http://s3a://bucketr/branch/deltaTable', ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\project\.venv\Lib\site-packages\deltalake\writer.py", line 148, in write_deltalake table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\project\.venv\Lib\site-packages\deltalake\writer.py", line 393, in try_get_table_and_table_uri table = try_get_deltatable(table_or_uri, storage_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\project\.venv\Lib\site-packages\deltalake\writer.py", line 406, in try_get_deltatable return DeltaTable(table_uri, storage_options=storage_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\project\.venv\Lib\site-packages\deltalake\table.py", line 238, in init self._table = RawDeltaTable( ^^^^^^^^^^^^^^ _internal.DeltaError: Delta protocol violation: Failed to serialize operation: expected value at line 1 column 1
r
at a glance,
table_or_uri='<http://s3a>://bucketr/branch/deltaTable',
should probably be
table_or_uri='<s3a://bucketr/branch/deltaTable>',
e
dont know why slack fortmatted it with http://. Thats not the case in my code
r
can you share the code you're running
e
for sure: import fsspec import pandas as pd import deltalake import lakefs_client from lakefs_client.models import * from lakefs_client.client import LakeFSClient import boto3 from lakefs_client.api import objects_api from deltalake.writer import write_deltalake import s3fs import pyarrow as pa lakefsEndPoint = 'http://127.0.0.1:59786/' # MINIKUBE SERVICE lakefs-service -n namespace (Forwarding port etc) lakefsAccessKey = 'key' lakefsSecretKey = 'secret' repo_name = "internal" import os os.environ["AWS_ACCESS_KEY_ID"] = lakefsAccessKey os.environ["AWS_SECRET_ACCESS_KEY"] = lakefsSecretKey os.environ["AWS_ENDPOINT"] = lakefsEndPoint os.environ["AWS_REGION"] = "us-east-1" #Not sure about this one but just did what you said. # lakeFS credentials and endpoint configuration = lakefs_client.Configuration() configuration.username = lakefsAccessKey configuration.password = lakefsSecretKey configuration.host = lakefsEndPoint lakefs = LakeFSClient(configuration) storage_options = {"AWS_ACCESS_KEY_ID": lakefsAccessKey, "AWS_SECRET_ACCESS_KEY":lakefsSecretKey, "AWS_ENDPOINT": lakefsEndPoint, "AWS_REGION": "us-east-1", #Not sure about this one but just did what you said. "AWS_STORAGE_ALLOW_HTTP": "true", "AWS_S3_ALLOW_UNSAFE_RENAME": "true" } fs = s3fs.S3FileSystem( endpoint_url="http://127.0.0.1:59786/", secret=lakefsSecretKey, key=lakefsAccessKey ) with fs.open('s3a://repository/branch/lakes.parquet', 'rb') as f: #This works as expected df = pd.read_parquet(f) df = df.astype(str) print(df.head()) # Initialize the Delta table deltalake.write_deltalake(table_or_uri='s3a://repository/branch/newTable', data=df, mode='overwrite', storage_options=storage_options)
r
hmmm I don't know, sorry. I'd suggest trying the Delta Lake community (e.g. https://github.com/delta-io/delta-rs/issues and also their Slack https://github.com/delta-io/delta#community). I'd also maybe start from the notebook that I shared and see if you can modify it one step at a time towards your code and see where it starts to break.
e
yes i will try. thank you alot for the help! I think this is more of a compabillity issue in delta lake more than lakefs.
👍 2