Andreas Hald
07/12/2024, 4:04 PMspark = SparkSessionIcebergLakefs().get_session()
df = (spark.readStream
.format("iceberg")
.option("stream-from-timestamp", "1720552141")
.load("lakefs.main.db.table")
)
table = "article_titles"
(df.writeStream
.format("iceberg")
.outputMode("append")
.trigger(once=True)
.option("checkpointLocation", <what-the-heck-do-i-put-here?>)
<---------------- The part I am struggling with
.toTable(f"lakefs.main.db.table1")
.awaitTermination(timeout=10)
)
spark config
spark_jars = ["org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0",
"io.lakefs:lakefs-iceberg:0.1.4"]
spark = SparkSession.builder.appName(f"lakeFS sample / {repo.id}") \
.config("spark.jars.packages", ",".join(spark_jars)) \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") \
.config("spark.sql.catalog.lakefs.warehouse", "<lakefs://test123>") \
.config("spark.sql.catalog.lakefs.uri", LAKEFSENDPOINT) \
.config("spark.sql.catalog.lakefs.cache-enabled", "false") \
.config("spark.sql.defaultCatalog", "lakefs") \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.endpoint", LAKEFSENDPOINT) \
.config("spark.hadoop.fs.s3a.access.key", LAKEFSACCESSKEY) \
.config("spark.hadoop.fs.s3a.secret.key", LAKEFSSECRETKEY) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
Itai Admi
07/14/2024, 9:22 AMs3a://<lakefs-repo>/<lakefs-branch>/<path-to-checkpoint>/
should work. Did you try that by any chance?Andreas Hald
07/18/2024, 7:57 AMAndreas Hald
07/29/2024, 1:31 PMSELECT * FROM catalog.db.orders.history;
Which gives me a
org.apache.iceberg.exceptions.RuntimeIOException: Failed to refresh the table
(It of course also gave me a way longer stack-trace which I'll be happy to post if it makes sense)
and the
SELECT * FROM prod.db.table FOR VERSION AS OF 'dev-branch';"
gives me
org.apache.iceberg.exceptions.ValidationException: Cannot find matching snapshot ID or reference name for version
On the flip side selecting the actual dev branch works
SELECT * FROM dev_branch.db.table;
It could well be that I am missing something on the iceberg-side as well, but as far as I understand the commands above should work out of the box for iceberg.
Any advice would be greatly appreciated and thanks in advance!Itai Admi
07/29/2024, 1:41 PMCopy codeSELECT * FROM catalog.db.orders.history;
Which gives me a
I believe the reference to the table should be of formorg.apache.iceberg.exceptions.RuntimeIOException: Failed to refresh the table
catalog.ref.db.table
and yours is catalog.db.table
. Can you try using the reference, like dev_branch
after the catalog?Andreas Hald
07/29/2024, 2:06 PMref.db.table
(I configured the .config("spark.sql.defaultCatalog", "lakefs")
A typo by me.
Just so I avoid confusing myself I'll use the actual query:
spark.sql("SELECT * FROM main.db_seeking_alpha.api_query_article_list.history;")
The stack trace further includes:
_`Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on s3a://test123/db_seeking_alpha/db_seeking_alpha/api_query_article_list/history/metadata/v0.metadata.json:`_
I notice that db_seeking_alpha is in the url twice for some reason and I wouldn't expect a history-folder to be there either (but this could be my limited knowledge of iceberg)Offir Cohen
07/29/2024, 2:52 PMAndreas Hald
07/29/2024, 4:19 PMspark_jars = [
"org.apache.hadoop:hadoop-aws:3.2.2",
"com.amazonaws:aws-java-sdk-bundle:1.11.683",
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0",
"io.lakefs:lakefs-iceberg:v0.1.2",
]
spark = (
SparkSession.builder.appName("lakeFS sample")
.config("spark.jars.packages", ",".join(spark_jars))
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog")
.config(
"spark.sql.catalog.lakefs.catalog-impl",
"io.lakefs.iceberg.LakeFSCatalog",
)
.config(
"spark.sql.catalog.lakefs.warehouse", "<lakefs://test123>"
)
.config("spark.sql.catalog.lakefs.uri", "<http://localhost:8000>")
.config("spark.sql.catalog.lakefs.cache-enabled", "false")
.config("spark.sql.defaultCatalog", "lakefs")
.config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
)
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.endpoint", "<http://localhost:8000>")
.config("spark.hadoop.fs.s3a.access.key", "testimctesti")
.config("spark.hadoop.fs.s3a.secret.key", "testimctesti")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.getOrCreate()
)
Andreas Hald
07/29/2024, 7:11 PM.config("spark.sql.catalog.lakefs.cache-enabled", "true")
solved itCallum Dempsey Leach
08/14/2024, 3:04 PM