Aapta Bhatt
04/17/2023, 12:09 PMOr Tzabary
04/17/2023, 12:24 PMimport
functionality is unavailable via API, it’s only available through CLI or UIAapta Bhatt
04/17/2023, 12:54 PMOr Tzabary
04/17/2023, 12:54 PMAapta Bhatt
04/17/2023, 12:56 PMOr Tzabary
04/17/2023, 12:59 PMAapta Bhatt
04/17/2023, 1:10 PMOr Tzabary
04/17/2023, 1:20 PMstore it to s3 bucket (incremental data)this step can be replaced with storing it to lakeFS directly (which behind the scenes will place the objects in your S3 bucket), this way you’ll be able to remove the import part into lakeFS lakeFS is compatible with S3 (using S3 gateway), so it should be easy to switch using your existing tools or if you like, you can use lakeFS API directly
Amit Kesarwani
04/17/2023, 3:09 PMlakefsEndPoint = '<https://aaaaaa.lakefscloud.io>'
lakefsAccessKey = 'aaaaaa'
lakefsSecretKey = 'aaaaaaa'
repo = "aaaa"
importBranch = "import"
objectStoreURI = "s3://"+bucketName # A URI on the object store to import from.
mainBranch = "main"
import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient
import datetime
# lakeFS credentials and endpoint
configuration = lakefs_client.Configuration()
configuration.username = lakefsAccessKey
configuration.password = lakefsSecretKey
configuration.host = lakefsEndPoint
client = LakeFSClient(configuration)
client.branches.create_branch(
repository=repo,
branch_creation=models.BranchCreation(
name=importBranch,
source=mainBranch))
from lakefs_client.api import import_api
from lakefs_client.model.branch_creation import BranchCreation
from lakefs_client.model.stage_range_creation import StageRangeCreation
from lakefs_client.model.meta_range_creation import MetaRangeCreation
from lakefs_client.model.range_metadata import RangeMetadata
def stage_range_creation(after, continuation_token):
return StageRangeCreation(
from_source_uri=objectStoreURI,
after=after,
prepend="",
continuation_token=continuation_token,
) # StageRangeCreation |
with lakefs_client.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = import_api.ImportApi(api_client)
repository = repo # str
# example passing only required values which don't have defaults set
try:
after = ""
continuation_token = ""
object_imported = 0
ranges = []
while True:
# create a lakeFS range file from the source uri
api_response = api_instance.ingest_range(repository, stage_range_creation(after, continuation_token))
ranges.append(RangeMetadata(
id=api_response.range.id,
min_key=api_response.range.min_key,
max_key=api_response.range.max_key,
count=api_response.range.count,
estimated_size=api_response.range.estimated_size))
object_imported = object_imported + api_response.range.count
if not api_response.pagination.has_more:
break
else:
after = api_response.pagination.last_key
continuation_token = api_response.pagination.continuation_token
meta_range_creation = MetaRangeCreation(
ranges=ranges,
) # MetaRangeCreation
api_response_create_meta_range = api_instance.create_meta_range(repository, meta_range_creation)
print("Object Imported: "+str(object_imported))
except lakefs_client.ApiException as e:
print("Exception when calling ImportApi->ingest_range: %s\n" % e)
client.commits.commit(
repository=repo,
branch=importBranch,
commit_creation=models.CommitCreation(
message='v1.0',
metadata={'version': '1.0'}),
source_metarange=api_response_create_meta_range.id)
Aapta Bhatt
04/21/2023, 7:38 AM