https://lakefs.io/ logo
Title
a

Aapta Bhatt

04/17/2023, 12:09 PM
Could you please drop me a python program which does above job without commandline
o

Or Tzabary

04/17/2023, 12:24 PM
unfortunately the
import
functionality is unavailable via API, it’s only available through CLI or UI
a

Aapta Bhatt

04/17/2023, 12:54 PM
hum
o

Or Tzabary

04/17/2023, 12:54 PM
@Aapta Bhatt if that’s something you’re interested in, feel free to open an issue in lakeFS repository if we see more engagement with this requirement, we’ll take that into account and prioritize or of course, if you’d like to contribute yourself, we’ll appreciate it
a

Aapta Bhatt

04/17/2023, 12:56 PM
Thanks for the response. Then how people are ingesting data programatically ?
o

Or Tzabary

04/17/2023, 12:59 PM
in most cases, the ingestion part is not a periodic action and is executed as part of the onboarding or adding specific datasets into lakefs. once the data is ingested to lakefs, we see people mostly interact with lakefs API directly, for instance, once the ingest took place, they will upload objects to lakeFS directly instead of the bucket itself
mind sharing your use-case?
a

Aapta Bhatt

04/17/2023, 1:10 PM
My use case is : • ETL - we fetch data from API, store it to s3 bucket (incremental data), then we import it to lakefs
o

Or Tzabary

04/17/2023, 1:20 PM
store it to s3 bucket (incremental data)
this step can be replaced with storing it to lakeFS directly (which behind the scenes will place the objects in your S3 bucket), this way you’ll be able to remove the import part into lakeFS lakeFS is compatible with S3 (using S3 gateway), so it should be easy to switch using your existing tools or if you like, you can use lakeFS API directly
by the way, if I recall, @Amit Kesarwani created a python script that does the import logic… Amit, if you do have it, mind sharing?
a

Amit Kesarwani

04/17/2023, 3:09 PM
@Aapta Bhatt
lakefsEndPoint = '<https://aaaaaa.lakefscloud.io>'
lakefsAccessKey = 'aaaaaa'
lakefsSecretKey = 'aaaaaaa'
repo = "aaaa"
importBranch = "import"
objectStoreURI = "s3://"+bucketName # A URI on the object store to import from.
mainBranch = "main"

import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient
import datetime

# lakeFS credentials and endpoint
configuration = lakefs_client.Configuration()
configuration.username = lakefsAccessKey
configuration.password = lakefsSecretKey
configuration.host = lakefsEndPoint

client = LakeFSClient(configuration)

client.branches.create_branch(
    repository=repo,
    branch_creation=models.BranchCreation(
        name=importBranch,
        source=mainBranch))

from lakefs_client.api import import_api
from lakefs_client.model.branch_creation import BranchCreation
from lakefs_client.model.stage_range_creation import StageRangeCreation
from lakefs_client.model.meta_range_creation import MetaRangeCreation
from lakefs_client.model.range_metadata import RangeMetadata

def stage_range_creation(after, continuation_token):
    return StageRangeCreation(
        from_source_uri=objectStoreURI,
        after=after,
        prepend="",
        continuation_token=continuation_token,
    ) # StageRangeCreation | 

with lakefs_client.ApiClient(configuration) as api_client:
    # Create an instance of the API class
    api_instance = import_api.ImportApi(api_client)
    repository = repo # str

    # example passing only required values which don't have defaults set
    try:
        after = ""
        continuation_token = ""
        object_imported = 0
        ranges = []
        
        while True:
            # create a lakeFS range file from the source uri
            api_response = api_instance.ingest_range(repository, stage_range_creation(after, continuation_token))
            ranges.append(RangeMetadata(
                    id=api_response.range.id,
                    min_key=api_response.range.min_key,
                    max_key=api_response.range.max_key,
                    count=api_response.range.count,
                    estimated_size=api_response.range.estimated_size))
            object_imported = object_imported + api_response.range.count
            
            if not api_response.pagination.has_more:
                break
            else:
                after = api_response.pagination.last_key
                continuation_token = api_response.pagination.continuation_token
        
        meta_range_creation = MetaRangeCreation(
            ranges=ranges,
        ) # MetaRangeCreation
        api_response_create_meta_range = api_instance.create_meta_range(repository, meta_range_creation)
        print("Object Imported: "+str(object_imported))
        
    except lakefs_client.ApiException as e:
        print("Exception when calling ImportApi->ingest_range: %s\n" % e)
        
client.commits.commit(
    repository=repo,
    branch=importBranch,
    commit_creation=models.CommitCreation(
        message='v1.0',
        metadata={'version': '1.0'}),
    source_metarange=api_response_create_meta_range.id)
:lakefs: 2
a

Aapta Bhatt

04/21/2023, 7:38 AM
Thanks Amit 🙂