Starting a new thread. So i made 2 changes this t...
# help
j
Starting a new thread. So i made 2 changes this time. I updated the postgres database instance to a M7g.large, with 20GB of drive space, and 3000 iops. I changed the client side postgres fields to: max_open_connections: 100 max_idle_connections: 100 connection_max_lifetime: 3h in the config.yaml file. The job got all the way to the end, all the partitioned data was successfully written out after about 2 hrs 50 minutes, which is what i expected. Then the job died on the final bulk delete. The cleanup retry attempts was set to 100. It retried 100 times, and then died. The ignore cleanup failures didn't seem to do anything, as an exception was thrown out of the spark job to my python code.
LakeFS Server
Postgres Database
Server logs
You can see that the total iops on the last DB graph never really got above 1000. In fact, it was consistently dropping until the end where it spiked because of the bulk delete, but was still way under 3000.
I don't know what else to do. I feel at this point that this is a reasonably scaled system for this operation that we're doing.
i
Hey @Joe M, does it mean that you made some progress or that the job is stuck at the same place? There’s something that is now working during the bulk deletes, and I’m sure will figure it out. Some questions/suggestions: 1. Is there some other action that is being performed on that branch during the bulk deletes? If it’s changing the branch HEAD (like commit, merge, etc.) it may cause some retries. 2. The default bulk size is set to 1000. I suggest trying to set it to something lower to see if that helps using
fs.lakefs.delete.bulk_size
. My guess is that 50 is low enough. 3. If you can, please collect metric from lakeFS just before the bulk delete and right after it ends (successfully or with failure). That could help narrow down the root cause.
j
Hey. Yes it got all the way to the end this last time. way further than it did any other time before. And yes, while the bulk delete is happening, i'm still committing every 10K uncommitted files. I have no control over stopping my commits during the bulk delete, cuz i have no visibility into when the spark job is doing bulk delete vs the partitioning work.
I can try again with that setting set to 50
i
Copy code
i'm still committing every 10K uncommitted files.
Can you please explain this logic? Just listing the changes and if it reaches 10K then commit?
j
per guidance from Ariel, i launch the spark job in a background thread (future) and on the main thread i commit every 10K changes, to keep the number of files down in the uncommitted area. specifically, the 10K check is doing this:
Copy code
def check_commit(branch, check_count):
    diffs = branch.uncommitted(max_amount=check_count)
    count = sum(1 for _ in diffs)
    log_info_message(f"{count} changes found on branch")
    if count >= check_count:
        log_info_message(f"{check_count} changes detected, committing now")
        commit_time=datetime.now().strftime('%Y%m%d_%H%M%S_%f')
        diffs = branch.commit(message=f"committing changes at {commit_time}")
that function is called with check_count = 10000 every 60 seconds-ish
i
A commit will cause all in-flight
deleteObjects
to fail and retry in the server side. I guess that with a certain parallelization level from Spark and 1000 bulk_size, you’re bound to have
deleteObjects
that are not succeeding after 3 retries.
I saw Ariel’s guidance. I think it’s a valid one. Not necessarily for this step though.
I think it will pass with the 50 bulk size. If that’s not the case we can discuss other options
j
ok, i'll try that. the other thing is there is a mapreduce flag: mapreduce.fileoutputcommitter.cleanup.skipped that i can set to true. but if i do that, i'll have to run an s3 delete command on the temp folder at the end, and i don't know if that will cause timeouts anyway.
for now i'll change the bulk delete size to 50.
out of curiosity at this point, can you explain what it is about what i'm trying to do that is actually an "edge case "? It seems like everyone would have these problems to some degree.
i
Honestly I don't have enough observability into your environment to answer that. It's definitely taking longer than usual. When our customers face similar issues, we're able to troubleshoot and solve quickly.
j
our use case is: • we have 29 .csv.gz files, with a total of 840 million rows across all the files • I pass the path to the files to the spark job, and do a read of all the files (spark_session.read.option("header", "true").csv(raw_data_path)) • We then partition the dataframe (file1Df.write.mode("overwrite").partitionBy("year","month","day","hour").parquet(partitioned_data_path)) by year, month, day, hour that's the entire operation.
this is our one time "backfill" operation where we're trying to load historical data into the partitions.
partitioning by hour is admittedly a rather high cardinality. but i imagine there are other cases that could cause similar cardinalities.
but that cardinality is what's causing the number files we're dealing with to be large.
also there are 7 years worth of data in that 840 million rows.
so, 24 partitions/day * 365 days/year * 7 years = 61,320 partitions, will be created by this job. and there will be multiple files in each partition due to the fact that spark is processing data from more than one .csv.gz raw file.
i
How many files are generated overall?
And we’re talking about a single lakeFS server, right?
j
yeah 1 lake FS server. it's an ecs cluster with 1 task, 4 vCPU and 8GB of ram. i think spark is creating one partition file per raw file that it processes. So presumably it would be 61,320 partitions * 29 files
I will also say that the format the customer gave us the data in, is in exactly the worst case scenario for partitioning. each file has some number entities (primary keys), with all 7 years of data for those entities. That basically means it has to work across the 61,320 partitions for every raw file it processes. If they gave us the data (or we pre-process the data outside of lakefs first) where there is one month of data for all entities in a given file, the number of partitions it would have to work with per file would be way less.
basically each file would have 24 partitions/day * 31-ish days = 744 partitions per file.
in the end, the same number of partitions would be created, obviously...but per file it's less work. i don't know if that would make any difference in the end, performance wise.
i
So almost 2M objects. With the way Spark works, I’d estimate 10M-20M API calls. Spread across 2-3 hours, however they can become quite bursty. Why just a single lakeFS server? I would use at least 3 replicas (ECS tasks) for this.
j
wait, what? you can have more than 1 lakefs server pointing at the same repos?
i
Also - not the biggest Spark expert, but I guess repartition would do wonders for your job. Ideally with that amount of data and partitions, you don’t need more than a single file per partition. Unless you’re only interested at this as a POC and interested in many files.
j
is there any documentation on how to setup a multi instance lakefs server cluster?
i
> wait, what? you can have more than 1 lakefs server pointing at the same repos? Of course! We rely on the DB for concurrency guarantees.
Just increase your ECS instances. lakeFS comes out of the box with this
j
each ecs task would have a different ip though right? my emr script is pointing spark at a specific IP.
ie: lakefs_end_point = 'http://10.0.1.237:8000'
i
I believe ECS provides this capability. It gives you some DNS / IP as the endpoint, no?
j
Hmm, ok, this potentially changes a lot. Let me look into the ECS server/task stuff that you're talking about. In the mean time i'll rerun with that bulk delete max set to 50. I won't get to this till later today, so i'll respond with results when i have them.
also regarding the repartition, I could put a repartition() command in there, but presumably that would increase the file operations. I'll investigate that too.
i
Sure, let us know if that worked