user
08/05/2022, 1:46 PMuser
08/05/2022, 1:54 PMuser
08/05/2022, 1:57 PMuser
08/05/2022, 1:58 PMuser
08/05/2022, 2:01 PMuser
08/05/2022, 4:52 PMData read from S3 using s3a gateway and Hudi format
%%configure -f
{
"conf": {
"spark.jars.packages": "org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-spark-bundle_2.12:0.7.0",
"spark.hadoop.fs.s3a.access.key": "",
"spark.hadoop.fs.s3a.secret.key": "",
"spark.hadoop.fs.s3a.path.style.access": "true",
"spark.hadoop.fs.s3a.endpoint": "<https://lakefs-internal-endpoint>",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
}
}
client_df = spark. \
read. \
format("org.apache.hudi"). \
load("<s3://bucket-name/ingestion/database/db/table/>")
• Data written in main branch created by LakeFS still using s3a gateway
common_config = {
'className': 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'timestamp',
'hoodie.datasource.write.partitionpath.field': "partition",
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.write.recordkey.field': "id",
'hoodie.table.name': "",
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': "",
'hoodie.datasource.hive_sync.table': "",
'hoodie.datasource.hive_sync.enable': 'false',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator'
}
client_df.write.format('org.apache.hudi')\
.partitionBy("partition")\
.options(**common_config)\
.mode("overwrite")\
.save("<s3a://hudi-integration/main/output-path/>")
• New branch created from Main and update in some lines
%%configure -f
{
"conf": {
"spark.jars.packages": "org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-spark-bundle_2.12:0.7.0",
"spark.hadoop.fs.s3a.access.key": "",
"spark.hadoop.fs.s3a.secret.key": "",
"spark.hadoop.fs.s3a.path.style.access": "true",
"spark.hadoop.fs.s3a.endpoint": "<https://lakefs-internal-endpoint>",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
}
}
# Reading Hudi table from LakeFS (main)
client_df = spark. \
read. \
format("org.apache.hudi"). \
load("<s3a://hudi-integration/main/output-path/>")
# Updating Read table
from pyspark.sql.functions import lit
client_df_updated = client_df.limit(2).withColumn("name", lit("Test Hudi"))
# Writing changes in another branch (hudi_test)
common_config = {
'className': 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'timestamp',
'hoodie.datasource.write.partitionpath.field': "partition",
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.write.recordkey.field': "id",
'hoodie.table.name': "",
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': "",
'hoodie.datasource.hive_sync.table': "",
'hoodie.datasource.hive_sync.enable': 'false',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator'
}
client_df_updated.write.format('org.apache.hudi')\
.partitionBy("partition")\
.options(**common_config)\
.mode("append")\
.save("<s3a://hudi-integration/otherv2/output-path/>")
• Data updates were committed and the new branch (hudi_test) were merged in main
What happened:
Data were saved as expected in branhes. However the files in S3 were duplicated.
It was also noticed that LakeFS knows how to differentiate the most updated file, but it doesn't aim to replace the old one, it keeps both, since writings in Hudi generate files with new names. Moreover, LakeFS cannot differentiate what has changed from one file to another, it only detects that one branch has one…user
08/05/2022, 4:54 PMuser
08/05/2022, 4:59 PMuser
08/05/2022, 5:03 PMuser
08/05/2022, 5:07 PMuser
08/05/2022, 5:32 PMuser
08/08/2022, 8:48 AMuser
08/08/2022, 1:37 PM