Hello! In my company, we have a data lake with Hu...
# help
u
Hello! In my company, we have a data lake with Hudi and tried to use LakeFS. However we realized it doesn't work well with Apache Hudi because many files were duplicated in our tests. Have anyone tried LakeFS with Hudi?
u
Hey @Isabela Angelo and welcome! Let me see if I can find something
u
They should be working together, you can read a blog post we released some time ago comparing the different data lake table formats https://lakefs.io/hudi-iceberg-and-delta-lake-data-lake-table-formats-compared/
u
I'll try to see if I can find snippets to share In the meantime, mind explaining what you tried to do, what was the expected outcome and what did you get?
u
Hi @Isabela Angelo I think what you experienced can be attributed to this open issue: https://github.com/treeverse/lakeFS/issues/1933 Since Hudi created intermittent files that are not committed, and hence, cannot be hard deleted using lakeFS GC. Does this make sense to you?
u
Hi @Or Tzabary and @einat.orr! Thank you for the fast reply =] @Or Tzabary The test steps: •
Data read from S3 using s3a gateway and Hudi format
Copy code
%%configure -f
{
"conf": {
        "spark.jars.packages": "org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-spark-bundle_2.12:0.7.0",
        "spark.hadoop.fs.s3a.access.key": "",
        "spark.hadoop.fs.s3a.secret.key": "",
        "spark.hadoop.fs.s3a.path.style.access": "true",
        "spark.hadoop.fs.s3a.endpoint": "<https://lakefs-internal-endpoint>",
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
    }
}
client_df = spark. \
  read. \
  format("org.apache.hudi"). \
  load("<s3://bucket-name/ingestion/database/db/table/>")
Data written in main branch created by LakeFS still using s3a gateway
Copy code
common_config = {
    'className': 'org.apache.hudi',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'timestamp',
    'hoodie.datasource.write.partitionpath.field': "partition",
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.datasource.write.recordkey.field': "id",
    'hoodie.table.name': "",
    'hoodie.consistency.check.enabled': 'true',
    'hoodie.datasource.hive_sync.database': "",
    'hoodie.datasource.hive_sync.table': "",
    'hoodie.datasource.hive_sync.enable': 'false',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator'
}

client_df.write.format('org.apache.hudi')\
    .partitionBy("partition")\
    .options(**common_config)\
    .mode("overwrite")\
    .save("<s3a://hudi-integration/main/output-path/>")
New branch created from Main and update in some lines
Copy code
%%configure -f
{
"conf": {
        "spark.jars.packages": "org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-spark-bundle_2.12:0.7.0",
        "spark.hadoop.fs.s3a.access.key": "",
        "spark.hadoop.fs.s3a.secret.key": "",
        "spark.hadoop.fs.s3a.path.style.access": "true",
        "spark.hadoop.fs.s3a.endpoint": "<https://lakefs-internal-endpoint>",
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
    }
}

# Reading Hudi table from LakeFS (main)
client_df = spark. \
  read. \
  format("org.apache.hudi"). \
  load("<s3a://hudi-integration/main/output-path/>")

# Updating Read table
from pyspark.sql.functions import lit
client_df_updated = client_df.limit(2).withColumn("name", lit("Test Hudi"))

# Writing changes in another branch (hudi_test)
common_config = {
    'className': 'org.apache.hudi',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'timestamp',
    'hoodie.datasource.write.partitionpath.field': "partition",
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.datasource.write.recordkey.field': "id",
    'hoodie.table.name': "",
    'hoodie.consistency.check.enabled': 'true',
    'hoodie.datasource.hive_sync.database': "",
    'hoodie.datasource.hive_sync.table': "",
    'hoodie.datasource.hive_sync.enable': 'false',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator'
}

client_df_updated.write.format('org.apache.hudi')\
    .partitionBy("partition")\
    .options(**common_config)\
    .mode("append")\
    .save("<s3a://hudi-integration/otherv2/output-path/>")
Data updates were committed and the new branch (hudi_test) were merged in main
What happened:
Data were saved as expected in branhes. However the files in S3 were duplicated. It was also noticed that LakeFS knows how to differentiate the most updated file, but it doesn't aim to replace the old one, it keeps both, since writings in Hudi generate files with new names. Moreover, LakeFS cannot differentiate what has changed from one file to another, it only detects that one branch has one…
u
@einat.orr It seems to be the problem! But I need to read more deeply to be sure 🤔 We have made another test that pointed this exactly problem but I am not sure if it is the same for the test I wrote above.
u
It really sounds like the garbage collection issue
u
I will follow this issue in GitHub and test again as soon as its resolved This files duplication blocked our project due to cost matters
u
Thanks again!
u
Thanks @Isabela Angelo
u
hey @Isabela Angelo, how is it going? I wonder if you made any progress since our last chat? is there anything else I can help with?
u
Hi @Or Tzabary Thanks for your message! I am following the GitHub Issue and I will test again when it's closed I will let you know 😊 There is nothing else for now... my problem is the files duplication specifically