Hey all, I have data landing to s3 from aws fireho...
# help
u
Hey all, I have data landing to s3 from aws firehose, where I cannot use lakefs. What will be the best approach to ingest data to lakefs continuously?
u
Hi @Nicola Corda as have on our roadmap https://docs.lakefs.io/understand/roadmap.html#improved-streaming-support-for-apache-kafka as part of supporting better use cases of data ingesting. I am less familiar with AWS firehose service. Is it a use case as with streaming where the service endpoint will serialize and store the data for you on S3?
u
Yeah pretty much, firehose is connected to a kinesis steam, it batch the records then seruslize to s3 in the desired format
u
Really helpful to avoid to have tiny files in the lake
u
I was thinking about a service that read the destination bucket of firehose and ingest the object to lakefs continuously, via s3 notifications and lambda functions (boto3)
u
it should be possible to process the S3 bucket events and import the data to lakefs
u
with periodic or known times to commit
u
looking at the service to understand if a simple integration can be enabled.
u
I have the feeling that it’s not possible, as it’s an AWS service. We cannot setup the endpoint to point to LakeFS.
u
But if we can import the information to lakefs. The import process without copy the actual data - just by reference it. You can get the same information inside lakeFS to ingest.
u
Yeah, that could work
u
How do I import without copying data? Do you have a reference?
u
This is the way
lakectl ingest
use to import data to lakefs. Assuming lakeFS got access to the data.
u
I'll look into it and update here tomorrow
u
Hi @Nicola Corda, manage to setup something that was working for me based on what we discussed.
u
The idea is having firehose deliver the data into s3 bucket, I used specific prefix. Using s3 bucket notification on the specific prefix, on new object creation a lambda function is triggered. The function will call lakeFS API to ingest the objects. The solution assumes that lakeFS can access the objects created by firehose. The lambda function I wrote is in python - and doesn't include any error handling:
u
Copy code
import json
import time
import base64
from urllib import request, parse

LAKEFS_BASE_URI = 'https://<lakeFS endpoint>/api/v1'
LAKEFS_REPO = 'repo1'
LAKEFS_BRANCH = 'main'
LAKEFS_KEY = '<lakefs key>'
LAKEFS_SECRET = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'

LAKEFS_CREDENTIALS = ('%s:%s' % (LAKEFS_KEY, LAKEFS_SECRET))
LAKEFS_ENCODED_CREDENTIALS = base64.b64encode(LAKEFS_CREDENTIALS.encode('utf-8'))

def lambda_handler(event, context):
    record_count = 0
    for record in event['Records']:
        if record['eventName'] != 'ObjectCreated:Put':
            continue
        s3_event = record['s3']
        s3_bucket_name = s3_event['bucket']['name']
        s3_object = s3_event['object']
        s3_object_key = s3_object['key']
        obj = "s3://{}/{}".format(s3_bucket_name, s3_object_key)
        params = { 'path': s3_object_key }
        req_url = '{}/repositories/{}/branches/{}/objects?{}'.format(LAKEFS_BASE_URI, LAKEFS_REPO, LAKEFS_BRANCH, parse.urlencode(params))
        data = {
            'physical_address': obj,
            'checksum': s3_object['eTag'], 
            'size_bytes': int(s3_object['size']),
            "mtime": int(time.time()),
        }
        headers = {
                'Content-Type': 'application/json; charset=utf-8',
                'Authorization': 'Basic %s' % LAKEFS_ENCODED_CREDENTIALS.decode("utf-8"),
                }
        req = request.Request(req_url, method='PUT', data=json.dumps(data).encode('utf-8'), headers=headers)
        resp = request.urlopen(req)
        record_count += 1
    
    return {
        'statusCode': 200,
        'body': json.dumps({"objects": record_count})
    }
u
Another important aspect - data commit. As this function can run in parallel, commit is suggested to run for a timed event and not part of the notification processing. Unless you deliver these messages throw another pipeline where you can control there will be one writer.