Hello, I'm new to lakeFS and would like to ask, do...
# help
d
Hello, I'm new to lakeFS and would like to ask, does anyone have an example of writing a Zarr store using xarray with lakeFS? Here is my code trying to do that but the last
to_zarr()
step is getting errors.
Copy code
import dask.array as da
import xarray as xr
import numpy as np

configuration = lakefs_client.Configuration()
configuration.username = access_key_id
configuration.password = secret_access_key
configuration.host = endpoint_url
client = LakeFSClient(configuration)

repo = "zarr-test"
branch = "zarr-store"
client.branches.create_branch(
    repository=repo,
    branch_creation=models.BranchCreation(
    name=branch,
    source="main"))

# Create random array
state = da.random.RandomState(1234)
shape = (180, 360, 400)
chunk_shape = (36, 72, 200)
nlats, nlons, ntimes = shape
arr = state.random(shape, chunks=chunk_shape)
arr = da.random.random(shape, chunks=chunk_shape)
ds = xr.Dataset(
    data_vars={
        "precipitation": xr.DataArray(arr, dims=('lat', 'lon', 'time'))
    },
    coords={
        "lat": xr.DataArray(np.linspace(-90, 90, num=nlats, endpoint=False), dims='lat'),
        "lon": xr.DataArray(np.linspace(-180, 180, num=nlons, endpoint=False), dims='lon'),
        "time": xr.date_range(start="2000-06-01", freq="D", periods=ntimes)
    },
    attrs={
        "description": "GPM IMERG test dataset"
    }
)

# Write the first 200 time slices
ds_0 = ds.isel(time=slice(0, 200))
s3a_gateway_path = f's3a://{repo}/{branch}/precipitation_data.zarr'
task = ds_0.to_zarr(s3a_gateway_path, 
                    # zarr_version=3, 
                    mode='w', 
                    compute=False)
Without
zarr_version
, I get
PermissionError: Access Denied
. If I set
zarr_version=3
, I get
KeyError: 'zarr.json'
. Maybe I am setting the
s3a_gateway_path
incorrectly?
a
I don’t know about Zarr store and how to use S3a gateway with Zarr but s3a gateway configuration requires to set few other parameters e.g. I use following code to set s3a parameters in Spark:
Copy code
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("lakeFS / Jupyter") \
        .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.endpoint", lakefsEndPoint) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.access.key", lakefsAccessKey) \
        .config("spark.hadoop.fs.s3a.secret.key", lakefsSecretKey) \
        .getOrCreate()
spark.sparkContext.setLogLevel("INFO")

spark
d
I set this configuration but I'm not sure how it interacts with writing Zarr. If anyone knows how to use the
<http://xarray.to|xarray.to>_zarr()
with lakefs, please let me know! Something else I tried is to write the Zarr store (which can be thought of as a directory of subdirectories and files) locally, and try to upload it with
client.objects.upload_object()
. But
content
is expected to be IOBase and I believe only a single file, not a group like the Zarr store. So does lakeFS support writing Zarr?
b
Hi @Dieu M. Nguyen - using the lakefs API will work for open a branch. From the docs the xarr the target for the dataset is fsspec - so you need to use s3fs to access the lakefs endpoint.
Example (update the endpoint to lakefs host
Copy code
s3 = s3fs.S3FileSystem(
                    anon=False,
                    endpoint_url="<http://localhost:8000>",
                    key="AKIAIOSFODNN7EXAMPLE",
                    secret="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
                )
s3_path = f"{repo}/{branch}/{name}"
s3store = s3.get_mapper(s3_path)
o
Thanks @Barak Amar - @Dieu M. Nguyen, here's a full example of writing to lakeFS using `to_zarr`:
Copy code
# requires:
# pip install s3fs numpy xarray zarr

import numpy as np
import xarray as xr

lakefs = {
    "key": "AKIAIOSFOLQUICKSTART",
    "secret": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    "endpoint_url": "<http://localhost:8000>", 
}

data = np.random.rand(2000, 5000)
da = xr.DataArray(data)
da.to_zarr('<s3://my-repository/my-branch/some/path/>', storage_options=lakefs)
🙏 1
d
Thank you so much both! I will give this a try today and reach back out with any questions.
🤘 1
lakefs 1
j
hey @Dieu M. Nguyen, did you get this all working correctly? I'm trying to store Zarr on LakeFS but having issues committing a Zarr file back to the repo with the same name (to have versioning control of my Zarr file). I believe it's because my Zarr file is a directory path technically, and LakeFS doesn't like it I understand this was quite a while ago 😄
e
Hi @James Hodson,
having issues committing a Zarr file back to the repo
Can you share more details? Are you getting an error message?
j
Hi! Yes of course, let me explain what I am trying to do. We are hoping to use LakeFS as a versioning tool for our data. Our data consists of Zarr files which we will be reading with Xarray. The plan is to store these on our s3 database, and use LakeFS to manage the version control. For testing purposes, I've just created a dummy .zarr file and put it into a local Minio storage container, and then run LakeFS/LakeFSSpec to create a repository and ingest that file. This all works perfect, and I can read my .zarr file with xarray from the repository too. However, if I then want to make a change to that xarray dataset, and send it back to LakeFS under the same name, I get the error:
ContainsGroupError: path '' contains a group
which I assume is because Zarr works with some file structure? I'm not too sure. This is the code I am currently using, and works up until I get the error:
Copy code
from lakefs_spec import LakeFSFileSystem
import pandas as pd
import lakefs
import xarray as xr
import zarr
import numpy as np

fs = LakeFSFileSystem()

REPO_NAME = "zarr-test"
repo = lakefs.Repository(REPO_NAME, fs.client).create(storage_namespace="<s3://zarr-example>")

# I ingest my data here through the UI instead of with Python as it's easier at the moment

NEW_BRANCH = lakefs.Branch(REPO_NAME, "transform-raw-data", client=fs.client)
NEW_BRANCH.create("main")

z = xr.open_zarr(f"<lakefs://zarr-test/main/xr_example.zarr>")

# make some changes to `z`...

# And this is where I get the error:
z.to_zarr('<lakefs://zarr-test/transform-raw-data/xr_example.zarr>', storage_options=lakefs)
I hope that helps explain a bit better, I'm not sure if there is something obvious I am missing here
d
@James Hodson Yes, I was successful with writing and reading Zarr using lakeFS! A few months ago though, I'm not sure if there are any version changes lately. But I think given that lakeFS is data format-agnostic, it should work. Given your error about a group already existing, I wonder if you can try including
mode="a"
in
<http://z.to|z.to>_zarr()
.
lakefs 1
gratitude thank you 1
Our use cases for lakeFS are quite similar. In my test, I was creating chunked Zarr data (3D arrays) and appending single slices in one dimension. The ending chunk is either updated or newly created. I had no issues using lakeFS. Let me know if you'd like me to share that piece of code.
j
@Dieu M. Nguyen I would love to see that code if you still have it around, using
mode="a"
helped with getting past my error so thank you. I think my steps in committing to the repo are what's going wrong now
d
Copy code
import s3fs
import zarr
import numpy as np
import xarray as xr
import dask.array as da
import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient


def main():
    # Configure lakefs
    lakefs = {
        "key": "",
        "secret": "",
        "endpoint_url": "", 
    }

    configuration = lakefs_client.Configuration()
    configuration.username = lakefs['key']
    configuration.password = lakefs['secret']
    configuration.host = lakefs['endpoint_url']
    client = LakeFSClient(configuration)

    repo = "lakefs-zarr-test"
    branch = "zarr-data"

    # Create data
    state = da.random.RandomState(1234)
    shape = (1800, 3600, 4000)
    chunk_shape = (36, 72, 200)
    nlats, nlons, ntimes = shape

    arr = state.random(shape, chunks=chunk_shape)
    ds = xr.Dataset(
        data_vars={
            "precipitation": xr.DataArray(arr, dims=('lat', 'lon', 'time'))
        },
        coords={
            "lat": xr.DataArray(np.linspace(-90, 90, num=nlats, endpoint=False), dims='lat'),
            "lon": xr.DataArray(np.linspace(-180, 180, num=nlons, endpoint=False), dims='lon'),
            "time": xr.date_range(start="2000-06-01", freq="D", periods=ntimes)
        },
        attrs={
            "description": "GPM IMERG test dataset"
        }
    )

    # Write data & commit change
    path = f's3://{repo}/{branch}/precipitation_data.zarr/'
    for i in range(2000, 4001):
        ds_slice = ds.isel(time=slice(i, i+1))
        t = ds_slice.indexes['time'][0].isoformat()
        print(f'Writing time slice: {i} -- {t}')
        ds_slice.to_zarr(path, 
                        append_dim='time', 
                        mode="a", 
                        storage_options=lakefs)
        client.commits.commit(
            repository=repo,
            branch=branch,
            commit_creation=models.CommitCreation(
                message=f'Wrote timestep {t}',
                metadata={'using': 'python_api'}))
@James Hodson Hope this ^ helps!
i
Thanks @Dieu M. Nguyen!
jumping lakefs 1
j
@Dieu M. Nguyen you're a legend, thanks so much!
❤️ 1
sunglasses lakefs 2