Hi! I am trying to create a spark structured strea...
# help
a
Hi! I am trying to create a spark structured streaming job with lakefs and iceberg. I am using the following config / code and it works fine for standard batch jobs, but I am struggling alittle with configuring the checkpoint location properly, is there a way to pass it to the lakefs-repo like I am doing the rest of the iceberg tables (I tried looking around for an example but unfortunately failed to find one). Thanks in advance! Code I am running
spark = SparkSessionIcebergLakefs().get_session()
df = (spark.readStream
.format("iceberg")
.option("stream-from-timestamp", "1720552141")
.load("lakefs.main.db.table")
)
table = "article_titles"
(df.writeStream
.format("iceberg")
.outputMode("append")
.trigger(once=True)
.option("checkpointLocation", <what-the-heck-do-i-put-here?>)
<---------------- The part I am struggling with
.toTable(f"lakefs.main.db.table1")
.awaitTermination(timeout=10)
)
spark config
spark_jars = ["org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0",
"io.lakefs:lakefs-iceberg:0.1.4"]
spark = SparkSession.builder.appName(f"lakeFS sample / {repo.id}") \
.config("spark.jars.packages", ",".join(spark_jars)) \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") \
.config("spark.sql.catalog.lakefs.warehouse", "<lakefs://test123>") \
.config("spark.sql.catalog.lakefs.uri", LAKEFSENDPOINT) \
.config("spark.sql.catalog.lakefs.cache-enabled", "false") \
.config("spark.sql.defaultCatalog", "lakefs") \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.endpoint", LAKEFSENDPOINT) \
.config("spark.hadoop.fs.s3a.access.key", LAKEFSACCESSKEY) \
.config("spark.hadoop.fs.s3a.secret.key", LAKEFSSECRETKEY) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
i
Hey @Andreas Hald, sounds like an interesting use case! Would you mind sharing a bit more of your goal with lakeFS + Iceberg + Spark Streaming? I see a timeout of 10s and a Once trigger, do you want to add a checkpoint to try avoiding 10s of repeated work or something? Per your question, I think that
s3a://<lakefs-repo>/<lakefs-branch>/<path-to-checkpoint>/
should work. Did you try that by any chance?
a
That did it, thanks a bunch! I could have sworn I tried that combination but must have included the bucket-name as well. The project is currently on a hobby level. I am used to working in spark-delta land and wanted to check out another of the open-table formats and since iceberg needs a catalog, I opted for lakefs instead of nessie (I really like the idea of being able to use lakefs for non-spark jobs as well and being able to generalize the Write-Audit-Publish pattern). Spark-structured streaming with was specifically to avoid all my pipelines being batch when it is not necessary. The timeout of 10 was a remnant of me being in configuration mode, where I was sometimes seeing jobs pend for a very long time if I had configured spark slightly wrong, I should have cleaned that up before posting here 😅
gratitude thank you 1
lakefs 1
Hi Itai, I have a follow up question which I thought best to post here since I am using the same configurations (except for the timeout part). I am struggling alittle with making the historical queries work for iceberg, such as:
Copy code
SELECT * FROM catalog.db.orders.history;
Which gives me a
org.apache.iceberg.exceptions.RuntimeIOException: Failed to refresh the table
(It of course also gave me a way longer stack-trace which I'll be happy to post if it makes sense) and the
Copy code
SELECT * FROM prod.db.table FOR VERSION AS OF 'dev-branch';"
gives me
org.apache.iceberg.exceptions.ValidationException: Cannot find matching snapshot ID or reference name for version
On the flip side selecting the actual dev branch works
Copy code
SELECT * FROM dev_branch.db.table;
It could well be that I am missing something on the iceberg-side as well, but as far as I understand the commands above should work out of the box for iceberg. Any advice would be greatly appreciated and thanks in advance!
i
Copy code
SELECT * FROM catalog.db.orders.history;
Which gives me a
org.apache.iceberg.exceptions.RuntimeIOException: Failed to refresh the table
I believe the reference to the table should be of form
Copy code
catalog.ref.db.table
and yours is
catalog.db.table
. Can you try using the reference, like
dev_branch
after the catalog?
a
Sorry I was already using
Copy code
ref.db.table
(I configured the .config("spark.sql.defaultCatalog", "lakefs") A typo by me. Just so I avoid confusing myself I'll use the actual query:
Copy code
spark.sql("SELECT * FROM main.db_seeking_alpha.api_query_article_list.history;")
The stack trace further includes: _`Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on s3a://test123/db_seeking_alpha/db_seeking_alpha/api_query_article_list/history/metadata/v0.metadata.json:`_ I notice that db_seeking_alpha is in the url twice for some reason and I wouldn't expect a history-folder to be there either (but this could be my limited knowledge of iceberg)
o
Hi @Andreas Hald Is db_seeking_alpha mentioned anywhere in configuration?
a
Hi Offir! Nope, the config looks like this and I am using pyspark with lakefs / postgres and then minio as a s3-replacement while developing locally:
Copy code
spark_jars = [
"org.apache.hadoop:hadoop-aws:3.2.2",
"com.amazonaws:aws-java-sdk-bundle:1.11.683",
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0",
"io.lakefs:lakefs-iceberg:v0.1.2",
]

spark = (
SparkSession.builder.appName("lakeFS sample")
.config("spark.jars.packages", ",".join(spark_jars))
.config(
    "spark.sql.extensions",
    "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog")
.config(
    "spark.sql.catalog.lakefs.catalog-impl",
    "io.lakefs.iceberg.LakeFSCatalog",
)
.config(
    "spark.sql.catalog.lakefs.warehouse", "<lakefs://test123>"
)
.config("spark.sql.catalog.lakefs.uri", "<http://localhost:8000>")
.config("spark.sql.catalog.lakefs.cache-enabled", "false")
.config("spark.sql.defaultCatalog", "lakefs")
.config(
    "spark.hadoop.fs.s3a.aws.credentials.provider",
    "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
)
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.endpoint", "<http://localhost:8000>")
.config("spark.hadoop.fs.s3a.access.key", "testimctesti")
.config("spark.hadoop.fs.s3a.secret.key", "testimctesti")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.getOrCreate()
)
Arh, setting
.config("spark.sql.catalog.lakefs.cache-enabled", "true")
solved it
👍 1
c
Are you using Databricks ?