user
12/01/2021, 11:01 AMuser
12/01/2021, 11:10 AMuser
12/01/2021, 11:13 AMuser
12/01/2021, 11:13 AMuser
12/01/2021, 11:15 AMuser
12/01/2021, 11:16 AMuser
12/01/2021, 11:16 AMuser
12/01/2021, 11:19 AMuser
12/01/2021, 4:31 PMuser
12/01/2021, 4:33 PMuser
12/01/2021, 4:55 PMuser
12/01/2021, 4:56 PMuser
12/01/2021, 4:58 PMlakectl ingest
use to import data to lakefs.
Assuming lakeFS got access to the data.user
12/01/2021, 5:01 PMuser
12/02/2021, 12:18 PMuser
12/02/2021, 12:19 PMuser
12/02/2021, 12:21 PMimport json
import time
import base64
from urllib import request, parse
LAKEFS_BASE_URI = 'https://<lakeFS endpoint>/api/v1'
LAKEFS_REPO = 'repo1'
LAKEFS_BRANCH = 'main'
LAKEFS_KEY = '<lakefs key>'
LAKEFS_SECRET = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
LAKEFS_CREDENTIALS = ('%s:%s' % (LAKEFS_KEY, LAKEFS_SECRET))
LAKEFS_ENCODED_CREDENTIALS = base64.b64encode(LAKEFS_CREDENTIALS.encode('utf-8'))
def lambda_handler(event, context):
record_count = 0
for record in event['Records']:
if record['eventName'] != 'ObjectCreated:Put':
continue
s3_event = record['s3']
s3_bucket_name = s3_event['bucket']['name']
s3_object = s3_event['object']
s3_object_key = s3_object['key']
obj = "s3://{}/{}".format(s3_bucket_name, s3_object_key)
params = { 'path': s3_object_key }
req_url = '{}/repositories/{}/branches/{}/objects?{}'.format(LAKEFS_BASE_URI, LAKEFS_REPO, LAKEFS_BRANCH, parse.urlencode(params))
data = {
'physical_address': obj,
'checksum': s3_object['eTag'],
'size_bytes': int(s3_object['size']),
"mtime": int(time.time()),
}
headers = {
'Content-Type': 'application/json; charset=utf-8',
'Authorization': 'Basic %s' % LAKEFS_ENCODED_CREDENTIALS.decode("utf-8"),
}
req = request.Request(req_url, method='PUT', data=json.dumps(data).encode('utf-8'), headers=headers)
resp = request.urlopen(req)
record_count += 1
return {
'statusCode': 200,
'body': json.dumps({"objects": record_count})
}
user
12/02/2021, 12:23 PM