https://lakefs.io/ logo
#help
Title
e

Eirik Knævelsrud

08/23/2023, 10:58 AM
Hi again guys! Really like lakefs. I have a problem regarding reading and writing delta tables using spark. It works but spark is huge and its complex and i will try everything to avoid it. So my question to you guys is if there are any other way to read and write delta tables to lakefs avoiding spark?
i

Itai Admi

08/23/2023, 11:05 AM
Hi @Eirik Knævelsrud, welcome to the lake 👋 jumping lakefs Delta tables can be written to lakeFS regardless of the compute engine (Spark in this case). How do you normally read/write delta tables without Spark? Have you tried writing them to lakeFS in the same manner? Could you expand some more on your use-case?
r

Robin Moffatt

08/23/2023, 11:06 AM
there are a bunch of integrations listed here that you could use https://delta.io/integrations
e

Eirik Knævelsrud

08/23/2023, 12:07 PM
Ok thank you for fast response. Ill do it the spark way now using this code: sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "access key") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "secret key") sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://example-lakefs.default:80") sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") df = sc.read.parquet(parquet_path, header=True) df.write.format("delta").mode("overwrite").save(delta_save_path) So im using python at this point and will continue with that. Im looking for the most light way approach of doing this logic. Thanks for the link of integrations. the most lightway i can see is this: https://github.com/delta-io/delta-rs . Any experience? (Or other approaces)
l

Lynn Rozen

08/23/2023, 12:29 PM
As Itai mentioned, it would be great if you can elaborate a bit so we can fully understand and help with your use-case 🙂 How do you currently using delta tables (regardless of lakeFS)? And do you mean you are using pyspark? What do you mean looking for the most light way approach of doing this logic?
e

Eirik Knævelsrud

08/23/2023, 12:34 PM
To start. Im totally new to this so dont shoot me if its a stupid question. Yes im using pyspark. I use pyspark to read the parquet and to write that df as a deltatable. Im having azure data lake as foundation and lakefs on top of this. So my question is. How can i do this without spark or any other havier systems? My desired approach would be something like: 1. pip isntall python package 2. Set up connection to lakefs without spark (Use boto3?) 3. pythonPackage.readParquet 4. pythonpackage.writeAsDelta
o

Oz Katz

08/23/2023, 1:13 PM
This could be useful (using Delta-rs with lakeFS' S3 Gateway):
Copy code
from deltalake import DeltaTable

# Replace with your lakeFS configuration and credentials
lakefs = {
    "AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE",
    "AWS_SECRET_ACCESS_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    "AWS_ENDPOINT_URL": "<https://my.lakefs.installation.com>", 
    "AWS_REGION": "us-east-1"
}

dt = DeltaTable("<s3://my-repo/branch/path/to/delta_table/>", storage_options=lakefs)
df = dt.to_pandas()
e

Eirik Knævelsrud

08/23/2023, 1:30 PM
yes @Oz Katz. im looking for something like this. For now im just have lakefs installed on a local minikube environment with port forward on lakefs service. In other words hosting lakefs locally. How is the auth info then?
o

Oz Katz

08/23/2023, 1:32 PM
Probably the same values you used for Spark (access key secret and endpoint). the region is likely
us-east-1
unless you explicitly changed the s3 gateway configuration in your lakeFS server
r

Robin Moffatt

08/23/2023, 1:38 PM
I'm trying to build a notebook for this to go in https://github.com/treeverse/lakeFS-samples/ but can't get it to work…
Copy code
storage_options = {"AWS_ACCESS_KEY_ID": lakefsAccessKey, 
                   "AWS_SECRET_ACCESS_KEY":lakefsSecretKey,
                   "AWS_ENDPOINT": lakefsEndPoint,
                   "AWS_STORAGE_ALLOW_HTTP": "true"
                  }
Copy code
{'AWS_ACCESS_KEY_ID': 'AKIAIOSFOLKFSSAMPLES',
 'AWS_SECRET_ACCESS_KEY': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
 'AWS_ENDPOINT': '<http://lakefs:8000>',
 'AWS_STORAGE_ALLOW_HTTP': 'true'}
Copy code
deltalake.write_deltalake(table_or_uri='<s3a://example/main/userdata/>', 
                          data = dataframe,
                          storage_options=storage_options)
Fails with
Copy code
Generic S3 error: response error "<?xml version="1.0" encoding="UTF-8"?>
<Error><Code>MissingFields</Code><Message>Missing fields in request.</Message><Resource></Resource><Region>us-east-1</Region><RequestId>fc5ae35f-aaa6-4891-b0e4-715b574c894a</RequestId><HostId>79C9987FC956D83E</HostId></Error>", after 0 retries: HTTP status client error (400 Bad Request) for url (<http://lakefs:8000/example/main/userdata/_delta_log/_last_checkpoint>)
in the lakeFS log is
Copy code
level=error msg="log header does not match v2 structure" func="pkg/gateway/sig.(*V2SigAuthenticator).Parse" file="build/pkg/gateway/sig/v2.go:94" header=Authorization

level=warning msg="failed to parse signature" func=pkg/gateway.AuthenticationHandler.func1 file="build/pkg/gateway/middleware.go:41" error=MissingFields
any ideas?
o

Oz Katz

08/23/2023, 1:58 PM
I wonder why sigv2 is even getting used here. Which version of Python / Delta-rs / lakeFS are you using? Also, you probably need to set
AWS_REGION
for this to work.
Where is
AWS_STORAGE_ALLOW_HTTP
coming from? I'm not familiar with that config value..
r

Robin Moffatt

08/23/2023, 2:13 PM
Copy code
Python version 3.10.10 | packaged by conda-forge | (main, Mar 24 2023, 19:56:21) [GCC 11.3.0] 
deltalake version 0.10.1
lakeFS version 0.106.0
AWS_STORAGE_ALLOW_HTTP
is buried in the S3 crate documentation, I stumbled across it in a delta-rs GH issue
okay this worked:
Copy code
storage_options = {"AWS_ACCESS_KEY_ID": lakefsAccessKey, 
                   "AWS_SECRET_ACCESS_KEY":lakefsSecretKey,
                   "AWS_ENDPOINT": lakefsEndPoint,
                   "AWS_REGION": "us-east-1",
                   "AWS_STORAGE_ALLOW_HTTP": "true",
                   "AWS_S3_ALLOW_UNSAFE_RENAME": "true"
                  }
@Eirik Knævelsrud you can see working code example here: https://github.com/treeverse/lakeFS-samples/blob/428128b71c733efb4f586a8130db829fb44a1f09/00_notebooks/delta-lake-python.ipynb Let me know if this is different from what you're trying to do
lakefs unicorn 1
e

Eirik Knævelsrud

08/23/2023, 3:20 PM
Ok thank you very much! Ill check it tomorrow morning and will give gove you an update :) Atleast its this way i wanted
got this error: Traceback (most recent call last): File "pathToPythonFile\test2.py", line 61, in <module> deltalake.write_deltalake(table_or_uri='http://s3a://bucketr/branch/deltaTable', ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\project\.venv\Lib\site-packages\deltalake\writer.py", line 148, in write_deltalake table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\project\.venv\Lib\site-packages\deltalake\writer.py", line 393, in try_get_table_and_table_uri table = try_get_deltatable(table_or_uri, storage_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\project\.venv\Lib\site-packages\deltalake\writer.py", line 406, in try_get_deltatable return DeltaTable(table_uri, storage_options=storage_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\project\.venv\Lib\site-packages\deltalake\table.py", line 238, in init self._table = RawDeltaTable( ^^^^^^^^^^^^^^ _internal.DeltaError: Delta protocol violation: Failed to serialize operation: expected value at line 1 column 1
r

Robin Moffatt

08/24/2023, 8:09 AM
at a glance,
table_or_uri='<http://s3a>://bucketr/branch/deltaTable',
should probably be
table_or_uri='<s3a://bucketr/branch/deltaTable>',
e

Eirik Knævelsrud

08/24/2023, 8:13 AM
dont know why slack fortmatted it with http://. Thats not the case in my code
r

Robin Moffatt

08/24/2023, 8:14 AM
can you share the code you're running
e

Eirik Knævelsrud

08/24/2023, 8:19 AM
for sure: import fsspec import pandas as pd import deltalake import lakefs_client from lakefs_client.models import * from lakefs_client.client import LakeFSClient import boto3 from lakefs_client.api import objects_api from deltalake.writer import write_deltalake import s3fs import pyarrow as pa lakefsEndPoint = 'http://127.0.0.1:59786/' # MINIKUBE SERVICE lakefs-service -n namespace (Forwarding port etc) lakefsAccessKey = 'key' lakefsSecretKey = 'secret' repo_name = "internal" import os os.environ["AWS_ACCESS_KEY_ID"] = lakefsAccessKey os.environ["AWS_SECRET_ACCESS_KEY"] = lakefsSecretKey os.environ["AWS_ENDPOINT"] = lakefsEndPoint os.environ["AWS_REGION"] = "us-east-1" #Not sure about this one but just did what you said. # lakeFS credentials and endpoint configuration = lakefs_client.Configuration() configuration.username = lakefsAccessKey configuration.password = lakefsSecretKey configuration.host = lakefsEndPoint lakefs = LakeFSClient(configuration) storage_options = {"AWS_ACCESS_KEY_ID": lakefsAccessKey, "AWS_SECRET_ACCESS_KEY":lakefsSecretKey, "AWS_ENDPOINT": lakefsEndPoint, "AWS_REGION": "us-east-1", #Not sure about this one but just did what you said. "AWS_STORAGE_ALLOW_HTTP": "true", "AWS_S3_ALLOW_UNSAFE_RENAME": "true" } fs = s3fs.S3FileSystem( endpoint_url="http://127.0.0.1:59786/", secret=lakefsSecretKey, key=lakefsAccessKey ) with fs.open('s3a://repository/branch/lakes.parquet', 'rb') as f: #This works as expected df = pd.read_parquet(f) df = df.astype(str) print(df.head()) # Initialize the Delta table deltalake.write_deltalake(table_or_uri='s3a://repository/branch/newTable', data=df, mode='overwrite', storage_options=storage_options)
r

Robin Moffatt

08/24/2023, 8:26 AM
hmmm I don't know, sorry. I'd suggest trying the Delta Lake community (e.g. https://github.com/delta-io/delta-rs/issues and also their Slack https://github.com/delta-io/delta#community). I'd also maybe start from the notebook that I shared and see if you can modify it one step at a time towards your code and see where it starts to break.
e

Eirik Knævelsrud

08/24/2023, 8:28 AM
yes i will try. thank you alot for the help! I think this is more of a compabillity issue in delta lake more than lakefs.
👍 2
2 Views