Conor Simmons
02/03/2023, 8:48 PMlakectl
when performing multiple imports?
Let's say I have S3 bucket 1 with cat.jpg at s3://cat_bucket/cat.jpg▾
s3://dog_bucket/dog.jpg▾
lakectl import --from <s3://cat_bucket/> --to <lakefs://repo/main/>
and manually merge _main_imported
• lakectl import --from <s3://dog_bucket/> --to <lakefs://repo/main/>
. Now, when I go to compare branches, I can see that cat.jpg is being removed if I merge
Output (not expected): only dog.jpg is now in mainAmit Kesarwani
02/03/2023, 9:07 PMConor Simmons
02/03/2023, 9:11 PMAmit Kesarwani
02/03/2023, 9:21 PMConor Simmons
02/03/2023, 9:24 PMI want to be able to important from multiple S3 buckets/paths/prefixes and combine those imports into a central repository
Amit Kesarwani
02/03/2023, 9:28 PMConor Simmons
02/03/2023, 9:29 PMAmit Kesarwani
02/03/2023, 9:30 PMstage_object
API in Python or lakectl fs stage
if you know the file size (or get the file size from S3) which might be easier route.Conor Simmons
02/03/2023, 9:39 PMAmit Kesarwani
02/03/2023, 9:40 PMstage_object
API.Conor Simmons
02/03/2023, 9:40 PMAmit Kesarwani
02/03/2023, 9:41 PMConor Simmons
02/03/2023, 9:41 PMAmit Kesarwani
02/03/2023, 9:59 PMstage_object
and it worked. You can stage a single file or full folder. Here is the code (make sure to pass the file size in bytes to object_stage
function):
import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient
from lakefs_client.model.object_stage_creation import ObjectStageCreation
from lakefs_client.model.object_user_metadata import ObjectUserMetadata
LAKEFS_ENDPOINT="lakefs enpoint"
LAKEFS_ACCESS_KEY="xxxxx"
LAKEFS_SECRET_KEY="xxxxxx"
configuration = lakefs_client.Configuration()
configuration.username = LAKEFS_ACCESS_KEY
configuration.password = LAKEFS_SECRET_KEY
configuration.host = LAKEFS_ENDPOINT
client = LakeFSClient(configuration)
def list_files(repo_name, branch):
api_response = client.objects.list_objects(repo_name, branch)
print([obj["path"] for obj in api_response.results])
def object_stage(source_uri):
object_stage_creation = ObjectStageCreation(
physical_address=source_uri,
checksum="",
size_bytes=1,
mtime=1,
metadata=ObjectUserMetadata( # optional
key="version: 1.0",
),
content_type="",
) # ObjectStageCreation |
return object_stage_creation
def stage_objects(repo_name, importBranch, sourceBranch, source_uri, path):
object_stage_creation = object_stage(source_uri)
print(object_stage_creation)
try:
api_response_1 = client.objects.stage_object(repo_name, importBranch, path, object_stage_creation)
print(api_response_1)
except lakefs_client.ApiException as e:
print("Exception when calling objects->stage_object: %s\n" % e)
client.commits.commit(
repository=repo_name,
branch=importBranch,
commit_creation=models.CommitCreation(
message='v1.0',
metadata={'version': '1.0'}))
repo = "my-repo"
import_branch = "_main_imported"
source_branch = "main"
client.branches.create_branch(
repository=repo,
branch_creation=models.BranchCreation(
name=import_branch,
source=source_branch))
source_uri = "<s3://sample-dog-images/n02085620-Chihuahua/n02085620_10074.jpg>" # A URI on the object store to import from.
path="dogs/n02085620_10074.jpg"
stage_objects(repo, import_branch, source_branch, source_uri, path)
list_files(repo, import_branch)
source_uri = "<s3://sample-dog-images/n02085620-Chihuahua/n02085620_10131.jpg>" # A URI on the object store to import from.
path="dogs/n02085620_10131.jpg"
stage_objects(repo, import_branch, source_branch, source_uri, path)
list_files(repo, import_branch)
client.refs.merge_into_branch(
repository=repo,
source_ref=import_branch,
destination_branch=source_branch)
Conor Simmons
02/03/2023, 10:07 PMAmit Kesarwani
02/03/2023, 10:08 PM