Hi. I'm seeing a performance problem with lakefs ...
# help
j
Hi. I'm seeing a performance problem with lakefs when making a modification to a branch that includes a lot of partitioned files. I'll preface this by saying, there are probably better ways of doing, what i attempted to do on my first attempt that backfilling 840 million rows of data into lakefs. Basically we have 840 million rows of data to import across 29 files. each file has 30M rows in it. We are partitioning the data by timestamp down to the hour. So year, month, day, hour. Because i was processing one raw file at a time and didn't commit after each one, there wound up being multiple parquet files in each partition folder. The total number of parquet partitioned files wound up being around 840K files. All around 12K in size. So, to fix this, i did a repartition of the partitioned folder in spark. The farther along it gets, there seems to be an increasing response time delay with your s3 gateway. During the repartition operation, if i go to the lakefs UI, and click on the Uncommitted Changes tab, and drill into the partitioned branch folders, it's taking around 30 seconds for it to actually render the UI. This presumably is the time it's taking it to enumerate all the files on s3. As such, the spark repartition job is seeing the same type of s3 response delays, and eventually i get a socket read timeout exception many hours into the repartition operation. The error stack trace is in the attached text snippet. This error is reproducible every time. So my question is, how do i fix this problem, and is this performance degradation over time expected?
The spark session was configured like this:
Copy code
with SparkSession.builder.appName(appName) \
            .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.endpoint", lakefsEndPoint) \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.access.key", lakefsAccessKey) \
            .config("spark.hadoop.fs.s3a.secret.key", lakefsSecretKey) \
            .config("spark.executor.memory", '2g') \
            .getOrCreate() as spark:
There is probably a way to increase the spark connection timeout in that session config. But i want to address the performance issue first and get some feedback.
also, the lakefs server itself is running as an ecs fargate service task. it's currently using a minimum configuration of 1 vCpu, and 4GB of ram.
n
Hi @Joe M are you working in an AWS S3 env?
The most efficient way to work with lakeFS and spark is using our hadoopFS implementation together with pre-signed URLs. You can read the configuration documentation here
j
Hi, yes, s3 is backing our lakefs server.
I'll take a look at the documentation page, thanks. I am definitely not using the presigned mode...nor did i even think about that, lol.
n
This will definitely give you a performance boost
j
thanks, i'll apply different configuration setup and try again.
n
Good luck
a
Hi @Joe M, If I understand correctly, you are repartitioning 800K objects, all of which are uncommitted. If so, you may run into https://github.com/treeverse/lakeFS/issues/2092. While we have not yet released a fix for this, a good workaround -if this is the issue- is first to commit, then repartition, then commit again.
j
thanks for the link to the issue. Hopefully the signed url usage will fix the problem for now. I forgot to mention that i had committed the 800K files first, before i started the repartition. all the files committed successfully within about 60 seconds. then i ran the repartition, which then timed out after many hours.
a question on the presigned mode stuff. I'm using pyspark on an EMR spark cluster. the current spark configuration i was using was setting these properties:
Copy code
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.endpoint", lakefsEndPoint) \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.access.key", lakefsAccessKey) \
            .config("spark.hadoop.fs.s3a.secret.key", lakefsSecretKey)
I see that the presigned configuration is using lakefs specific properties with fs.lakefs.*
Copy code
sc._jsc.hadoopConfiguration().set("fs.lakefs.access.mode", "presigned")
sc._jsc.hadoopConfiguration().set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem")
sc._jsc.hadoopConfiguration().set("fs.lakefs.access.key", "AKIAlakefs12345EXAMPLE")
sc._jsc.hadoopConfiguration().set("fs.lakefs.secret.key", "abc/lakefs/1234567bPxRfiCYEXAMPLEKEY")
sc._jsc.hadoopConfiguration().set("fs.lakefs.endpoint", "<https://example-org.us-east-1.lakefscloud.io/api/v1>")
Do i need to include/install any python packages for this to work?
these are the only imports i currently have:
Copy code
import os
import boto3
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, hour, current_timezone, to_utc_timestamp
n
No need to install any additional python packages. The only change you need to do is to use the lakeFS scheme for your requests. i.e. instead of
<s3a://repo/branch/path-to-object>
use
<lakefs://repo/branch/path-to-object/>
j
got it, thanks.
Hi, I'm getting this exception when running with the above configuration. I assume there's some java package that needs to get added somehwere?
Copy code
24/05/31 15:38:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
loading partition from <lakefs://ct-raw/partitioned/partitioned-data/>
24/05/31 15:38:28 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
24/05/31 15:38:28 INFO SharedState: Warehouse path is 'file:/mnt/c/src/ct/badger/badger_be/load-scripts/spark-warehouse'.
24/05/31 15:38:29 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: <lakefs://ct-raw/partitioned/partitioned-data/>.
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.lakefs.LakeFSFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
i figured it out. you need this on the pyspark session creation:
Copy code
.config("spark.jars.packages", "io.lakefs:hadoop-lakefs-assembly:0.2.4")
you might want to add that to the PySpark section of the code sample here: https://docs.lakefs.io/integrations/spark.html#hadoop-filesystem-in-presigned-mode
Hmm, adding that spark.jars.packages fixed the problem locally, but not on EMR. still getting this error:
Copy code
24/05/31 23:58:59 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: <lakefs://ct-raw/partitioned/partitioned-data/>.
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.lakefs.LakeFSFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2693) ~[hadoop-client-api-3.3.6-amzn-3.jar:?]
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3628) ~[hadoop-client-api-3.3.6-amzn-3.jar:?]
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3663) ~[hadoop-client-api-3.3.6-amzn-3.jar:?]
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:173) ~[hadoop-client-api-3.3.6-amzn-3.jar:?]
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3767) ~[hadoop-client-api-3.3.6-amzn-3.jar:?]
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3718) ~[hadoop-client-api-3.3.6-amzn-3.jar:?]
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:564) ~[hadoop-client-api-3.3.6-amzn-3.jar:?]
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:373) ~[hadoop-client-api-3.3.6-amzn-3.jar:?]
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53) ~[spark-sql_2.12-3.5.0-amzn-1.jar:3.5.0-amzn-1]
lol, i figured out that you need to specify the --packages command on the emr step creation. that made it run on emr.
ok, so I got the same error, but apparently for a different reason. The repartitioning and writing out of the actual files worked and was actually really fast. However something at the end caused the read timeout this time? Possibly while the lakefs server was updating the metadata to the postgres db or whatever? I see files in all the partitioned folders and they're all the correct size. So something happened at the end.
I'm going to provide the entire stderr and stdout from the application in the EMR job. There are actually several exceptions that occurred. some timeouts, some connection resets, and even some read errors. you can search for all the exceptions in the stderr file. the exception i pasted above was in the stdout file, which i think is only probably what i logged at the end when the app finally failed.
o
Hi @Joe M Thanks for the logs and information. We will have a look into this
j
Thanks. Also, FYI, here is the script that i ran on the EMR cluster.
a
Hi @Joe M, I continue to believe that it might be the same issue. Here's a hacky workaround: perform several commits concurrently with your repartitioning. lakeFS is designed for this to be a safe operation. They don't need to happen from your Spark job, just try to commit every 10k or so files. Would that work for you?
j
Hi. the problem is partial commits are out of my control. let me give you an update. my previous scheme for partitioning data was loading in one raw data file at a time, partitioning it, till all raw files were processed, then i would commit it. With that process, i could at least commit after each raw file was processed, cuz the spark job would end. The problem with this approach is, partitioning to the hour was taking 2.5 hours per file. I since realized that you could pass a path (more or less) to spark and have it process all the files at once. I ran this processes not using lakefs (going straight to s3) and this basically processed all the files in the same time frame as doing one at a time. So all 840M rows across 29 files were processed with one spark command and finished in 2.5 hours. So, clearly the bottleneck is the s3 file operations being slow due to so many files involved with partitioning by hour. So, given the speed increase i have stumbled on outside of lakefs by processing all files at once by giving it a path, now the ability to commit is really out of my control. So if I were to use the lakefs os approach (signed urls), i would pass the raw files path to spark, but point it at lakefs, and it would process all 29 files at once and not commit at all until all the raw files were processed, and the partition call actually returns.
a
Sorry for not explaining better. Run Spark to repartition. And every 20 minutes or so, commit from another process. Even your cli. I believe this hack could be the fastest way to unblock you right now, until we fix the underlying issue . (Also if it worked it would indicate that this is indeed the issue that I suspect.)
j
Hmm, let me ponder that idea and experiment. Thanks for the idea.
so, i have another question, cuz i don't think this will work. since spark is writing files directly to lakefs on the specific branch, (thus those files are showing up in the Uncommitted Changes tab in the UI), i don't think we would have data integrity if i just randomly commit while spark is working right? if spark is in the middle of updating a file that i'm committing, what will happen? will it commit partially updated and/or corrupted files? Also, this means that i would have to ingest into a new branch each time because i'd want to merge the branch with all the changes, once spark completes) back to the partitioned branch so its atomic.
oh maybe the committing the file while it's being updated is not a problem here since we're deal with s3, which wouldn't show the contents of the updated file until the update was done.
n
Hi @Joe M, there's no risk of data integrity if you try to commit during the spark job. An object is written to lakeFS only after it is completely uploaded to the storage. lakeFS will simply take all the uncommitted changes at that point in time and will write them into a new commit. Any objects that were not picked up by that commit will remain in the staging area (uncommitted)
👍 1
a
To amplify what @Niro says: safe concurrent writes and commits has always been an explicit requirement in lakeFS. We designed and implemented the whole thing around this requirement. And we do not allow performance to trump this correctness requirement. (Correctness is the kind of thing that makes for really boring announcements, so we probably don't say it enough. But it's obviously a top requirement for a product that manages data!)
j
ok thanks. I don't know if i'm going to have time to work on a modified script to do the commits in parallel. do you have a sense of what/where the problem is? It seems improbable that nobody else is seeing this issue. I can't even get our first real load attempt to complete. we may just have to wait for a fix to be made before we can continue our tests to see if this will work for us.
a
I believe that this is the issue. Will be sure once you commit during the run and can confirm that this helps. Of course how severely it affects a user depends on many factors; you may have managed to hit a bunch of the bad cases. Sorry.
j
is there a timeframe for that to be fixed? I don't think i'm going to be able to get to testing this scenario with parallel commits. I have a lot of other stuff on my plate at the moment.
e
Hi @Joe M, this issue is not likely to be fixed in the next couple of months. The reasons for that are that you are the second person to report it, and it has a straightforward workaround that shouldn't take long to implement. You are welcome to share your thoughts on the GitHub issue and see if the community votes to prioritise it. You are also welcome to contribute a fix, of course.
j
The work around being commit in parallel?
👍 1
ok, thanks.
e
Yes. Would an example code snippet for the workaround be helpful for you?
j
probably not. the way I would do it would be to use the concurrent/future apis in python to launch the spark partition process in a background thread, and then monitor the number of uncommitted files in the repo branch on the main thread, and commit as appropriate.
...until the partition future is marked as completed
👍 1
so, one thing i realized with this approach is that, while the final state of the branch will be correct, committing every 10K uncommitted files in the early stages of the spark job will be committing temporary files to the branch. ultimately on the final commit, when the temp directory is deleted, there will only be the final partitioned files. So, while this works, it's adding unnecessary history and thus storage space to the repo. I assume running a GC on the repo after this process would delete any files no longer in any branches?
also, another question. why are uncommitted changes on a branch showing up in the "Objects" tab in the UI for that same branch? If an unrelated process is reading from the objects api on this branch while the partitioning is happening, would it see the uncommitted changes?
e
Hi @…, to your first question, yes, GC will delete objects not pointed to by any branch or commit ID. To your second question, no, accessing a branch will not include uncommitted data. When accessing the branch one actually accesses a commit ID, either head (the latest commit) or a previous commit, and hence will only see committed data.
j
ok, but why is the objects tab showing uncommitted files? is that doing something other than showing the latest commit?
a
Hi @Joe M, You are correct, so I obviously should explain. This branch head ref includes uncommitted data. It needs to be there, otherwise programs like Spark would be unable to run. Basically the branch ref head has the same semantics as an S3 bucket. That's why distributed data processing works. But what if you need to access only committed objects on the branch head? Use a @ suffix. So ref
main@
sees the latest commit on main, but no uncommitted objects. In fact anything other than a branch head ref is immutable, and shows no uncommitted objects. This includes tags, commit digests, abbreviated commit digests, and any ref expression such as
main~1
.
j
ah, ok, that makes sense. thanks.
I'm running an emr job now that is doing the background commit every 10K files. i'll let you know how it goes.
👍 1
a
tl;dr: try
ct-ref@
j
it seems to be committing every 65 seconds so far.
a
Well, hopefully it won't slow down... so this rate will continue. 🙂
j
fwiw, the lakefs UI is completely unresponsive while this job is running at this point
image.png
a
Uggh. Any chance of sharing lakeFS logs from this period of time, and configuration?
j
yeah, i'll gather all that once this finishes. the job is progressing still.
and strangely, when i closed that browser tab and reopened it, it was responsive again.
one other question going back to the GC. is there a way to run the GC through the lakefs python sdk or the cli? or do you have to do it through spark?
a
GC is a Spark job. You're probably sick of hearing this by now: Of course lakeFS Cloud manages this for you 👼🏼 .
j
lol
jumping lakefs 1
a
If restarting the tab of the web UI fixed things, logs probably won't help. Something client-side.
j
well, the job failed. here's the emr logs with the stack trace.
it ran about 1 hour and 9 minutes out of the expected 2.5 hours it should take for the while thing to run.
iirc that's about how long it went last time, though the exception is slightly different this time. still a socket timeout/closed exception.
a
Thanks! We will probably only be able to take a look at your logs tomorrow. Is there any chance of getting server-side logs (lakeFS) for that period of time? Particularly for the 15 minutes near the job failure, of course.
j
i will try and figure out where the logs are, and provide them. also fyi, the commits continued to go every minute and only take ms, up until the failure.
also of note, we're currently using 1.21.0. i have't upgraded the box to 1.25 yet, though i imagine that won't matter.
argh...i wonder if spark is dying because it's trying to clean up temp files that don't exist anymore in the path it wrote them to, because i committed them. The majority of the exceptions are trying to delete files that it says don't exist.
I've seen that happen before with spark.
here's the lakefs server logs. you'll have to sort the rows by the first column cuz the stupid s3 console exporter doesn't sort the logs when it writes them to s3.
oh, i misread the exception. It looks like it's the same timeout issue, though this time while spark is trying to bulk delete it's temp files from lakefs.
a
The good news is that we finished writing! But... this one might be harder to handle. The Spark OutputCommitter really likes to delete its files in a bulk - even committing from the outside might not help here, you will need a sudden burst in commits rate - all the DeleteObjects are from just 3 minutes. • Your KV is probably overloaded. Is this DynamoDB, or something else? Alternatives might be configure Spark: • Can use use lakeFSFS, but set
mapreduce.fileoutputcommitter.cleanup-failures.ignored
or even
mapreduce.fileoutputcommitter.failures.attempts
? (ref) • Do you want to go back to trying to use S3A? ◦ Can you set
fs.s3a.directory.marker.retention
to
keep
? (ref)
j
the KV is a postgres db, and it's probably a t1.micro
i don't think ignoring the delete failures is an option. that will leave temporary files in the branch after the final commit.
I think mapreduce.fileoutputcommitter.failures.attempts sounds hopeful, looks like it defaults to 1. I can try up'ing it to like 20 or something.
a
Agree that allowing more failures should help. I hope it also applies to deletes, thankfully I haven't looked at that Spark code in ages
the KV is a postgres db, and it's probably a t1.micro
That probably does not have sufficient iops. Also I'd want a more resilient database setup for production.
i don't think ignoring the delete failures is an option. that will leave temporary files in the branch after the final commit.
Well, you'd still be deleting them - only after the Spark job, or by calling
lakectl fs rm
from the driver. In any case failure to delete is a recoverable failure.
j
yeah the KV t1.micro is just in our dev environment for testing, and cost. I will try setting the attempts first to see if that fixes it.
actually the postgres is technically a db.t4g.micro
image.png
we're not going to get a lot of IOPS performance boost by scaling vertically within the t4g group
also the t4g.micro is using a 20GB local gp2 SSD. there's no ebs drive, so it's not possible to provision the iops i don't think.
so i'm not sure the disk itself is the issue.
though i do see there are other options available. do you think the issue is with the KV store for sure? If so, i think it makes more sense to experiment with scaling up the postgres instance.
a
I suspect kv performance. This is a really tough setup to tune, sorry. Tuning is trial and error even at the best of times. Postgres is not our primary high-performance kv, and I remember very little of how to tune raw Postgres on a small machine. I remember there's a bunch of config values that you need to bump, but I've not done any of that for several years. Any chance of collecting metrics? That's the
/metrics
endpoint. They're intended for Prometheus, but yours doesn't sound like a k8s setup. However you could periodically fetch those into a sequence of files, and maybe we could fake it from those. Or run lakeFS with DEBUG logging I think, and get durations of many operations.
j
what is your high performance kv store? Dynamo?
👍 1
I suppose another relatively easy test would be to reconfigure lakefs to use the default dynamo config and see if that works out of the box.
if it does, then clearly the kv store is the issue
also i was looking at the sizing page in the docs. It looks like it's possible to use the abuse lakectl command with the write test against the postgres instance?
a
You also get good CloudWatch monitoring for DynamoDB. It will show you whether you're being throttled, and with some effort even which partitions. Managed services heart lakefs
j
yeah i was looking at the base monitoring for the postgres instance. it was definitely being hit pretty hard. though the db itself was only reporting a max of like 25ms write lag and 15ms read.
I think i'll try a test with dynamo and see how that goes.
👍 1
ok i got a new lakefs instance setup now using dynamo db. i'm going to run the same emr job on that setup now.
job is running, but i have a feeling it's going to fail. this is concerning, lol.
that's the lakefs server box. cpu is pegged at 100%
it's possible that box is the bottleneck and not the db. we'll see.
man, the writes are way slower. it's started committing about every 2 minutes when it started. it's up to every 4 minutes now. this is committing every 10K files. It's been running for 3.5 hours. Going straight to s3 without lakefs at all, it finished in 2.5 hours.
This error occurred early in the process
yeah it looks like one of the tasks within spark died. it's not writing out some of the partitions for some years at all. I'm going to kill this job. here's the output from the metrics endpoint
lakefs-metrics.txt
dynamo metrics
the graph for the lakefs server is the same as it was above
at this point, i'm going to run one more test, scaling up the lakefs server box with more cpus and memory and see if i can get the processor to not be pegged at 100%. I'm also going to go back to the postgres db setup. If that doesn't fix it, we're probably going to have to move on from this solution.
e
I hope it goes well. I believe that finding the right AWS deployment for your scale may require a bit more effort on your part. lakeFS is being used in this scale and more.
j
hi, yes, i agree. I will try and scale up to some reasonable level. however, if it gets to the point that this seemingly fairly simple use case requires crazy and costly resources, then we can't go down this path.
e
It shouldn't get to that at your scale, but tweaking is required.
lakefs 1
a
What can I say? For better or worse, Spark solves any problem as a big data problem.