Title
#help
Guy Hardonag

Guy Hardonag

09/15/2022, 1:26 PM
Welcome back 😄 As far as I know the first error has nothing todo with lakeFS, writing without overwrite to an existing should fail. Looking into to the second error… will be back with an answer today
Iddo Avneri

Iddo Avneri

09/15/2022, 2:07 PM
You might want to use this notebook - could be easier: https://github.com/treeverse/lakeFS-samples/tree/main/03-apache-spark-python-demo
r

Robin Moffatt

09/15/2022, 2:11 PM
oh cool @Iddo Avneri, that looks useful. thanks!
Iddo Avneri

Iddo Avneri

09/15/2022, 2:46 PM
There is also a youtube video going over setting up one of these (the delta specifically, but SPARK will be the same):

https://youtu.be/knbiXy6mWNg

Barak Amar

Barak Amar

09/15/2022, 3:59 PM
@Robin Moffatt in your notebook I've configured the spark session this way:
from pyspark.context import SparkContext
from pyspark import SparkFiles
from pyspark.sql.session import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.hadoop.fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") \
    .config("spark.hadoop.fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") \
    .config("spark.hadoop.fs.s3a.endpoint", "<http://lakefs:8000>") \
    .getOrCreate()
sc = spark.sparkContext
4:00 PM
Using the above the s3a will go though lakefs s3 gateway and write/override the data.
r

Robin Moffatt

09/15/2022, 4:11 PM
Thanks @Barak Amar. So does that override https://github.com/treeverse/lakeFS/blob/master/deployments/compose/etc/hive-site.xml#L51-L62 and do something different?
Barak Amar

Barak Amar

09/15/2022, 4:15 PM
right, I need to check why the spark inside the notebook doesn't use these values.
4:15 PM
did my session setup work for you?
r

Robin Moffatt

09/15/2022, 4:15 PM
It seems to use something because it writes the first dataframe just fine
4:16 PM
I'm trying your config on the notebook now
4:16 PM
No, I get the same error
4:16 PM
Caused by: java.io.FileNotFoundException: 
No such file or directory: <s3a://example/remove_pii/demo/users/part-00000-7a0bbe79-a3e2-4355-984e-bd8b950a4e0c-c000.snappy.parquet>
4:17 PM
I'm using Everything Bagel setup, if that makes any difference?
Barak Amar

Barak Amar

09/15/2022, 4:17 PM
the error is on the second branch
4:18 PM
do you see the branch in the UI?
r

Robin Moffatt

09/15/2022, 4:18 PM
let me check
4:19 PM
so is something weird happening with the overwrite behaviour?
4:20 PM
it's removing it to overwrite it but expects it to be there when it then writes it again?
4:20 PM
/me grasps at straws with his very limited understanding of this stuff
Barak Amar

Barak Amar

09/15/2022, 4:22 PM
the last error you paste was from a request to read the file?
r

Robin Moffatt

09/15/2022, 4:22 PM
no, a write statement
df2.write.mode('overwrite').parquet('s3a://'+repo+'/'+branch+'/demo/users')
Barak Amar

Barak Amar

09/15/2022, 4:24 PM
ok, I'll be next to my laptop a bit later will rerun the notebook and get back with better answers in an hour
r

Robin Moffatt

09/15/2022, 4:28 PM
thanks!
Barak Amar

Barak Amar

09/15/2022, 6:21 PM
Hi Robin, the problem is that when we (notebook) remove the pii - we need to read the data and write the data. In this case you like to read the data from the main branch and write the data to the remove_pii branch. Currently the branch remove_pii holds the right data (at the point you branch), but we write to the same location we read from.
6:22 PM
Update the notebook to read from 'main' and write to 'remove_pii'
repo='example'
xform_df = spark.read.parquet('s3a://'+repo+'/main/demo/users')
...
df2=xform_df.drop('ip_address','birthdate','salary','email')
and later
df2.write.parquet('s3a://'+repo+'/remove_pii/demo/users')
6:23 PM
and it should work
6:26 PM
let me know if it is working for you and/or I need to better explain what I found so far.
r

Robin Moffatt

09/16/2022, 8:43 AM
hey @Barak Amar, thanks for this. By using your approach (read from main, write to branch) I got it to work. However I’m still not clear on this, because in my mental model of branching (based on git) I would expect to also read from my branch;
main
could have changed since I created my
remove_pii
branch, so reading from it feels incorrect. Can you help clarify this?
Barak Amar

Barak Amar

09/16/2022, 9:10 AM
Sure, the thing is not directly related to lakeFS. We try to transform and overwrite the same path. So the data is lost as part of the override process.
9:13 AM
From lakefs point of view you can read it from the commit ID the branch was created from. it should work too.
9:14 AM
When using the branch name, the read and the write look at the same directory. The staging area.
9:39 AM
Use the reference from https://pydocs.lakefs.io/docs/BranchesApi.html#get_branch from 'remove_pii' branch before you override. When you think about it we enable a nice solution to a problems like https://community.databricks.com/s/question/0D53f00001HKHkSCAX/spark-how-to-simultaneously-read-from-and-write-to-the-same-parquet-file Hope it make sense.
r

Robin Moffatt

09/16/2022, 9:40 AM
9:40 AM
I think the issue here is I am also learning pyspark & parquet files etc too as I go, so apologies for confusing things with general misunderstanding 😉
9:41 AM
Use the reference from https://pydocs.lakefs.io/docs/BranchesApi.html#get_branch from ‘remove_pii’ branch before you override.
so in pseudo code I would get_branch use the ref returned to then read the file (instead of via branch ) is that right?
Barak Amar

Barak Amar

09/16/2022, 9:42 AM
Yes, about the branch information - I think it would be the right way to go. About spark - learning the pitfalls with you, so this is great 🙂
9:43 AM
Don't need all the code from the docs - like you did with create branch, you already have a client.
9:43 AM
Here if you need anything else to get it working.
r

Robin Moffatt

09/16/2022, 9:43 AM
thanks, I appreciate it!
9:51 AM
this also seems to work - read from the branch, but use
.cache()
on the data frame before writing it https://stackoverflow.com/a/65330116/350613
Barak Amar

Barak Amar

09/16/2022, 9:54 AM
Saw this one - don't like to count on cache to prevent data access. Don't know when it will not have enough space and it will go to the actual storage. Or it is based on writing the data and move it at the end. So read/process/write without the cache will probably be faster for later sets.
9:55 AM
Assume there is no magic in software engineering - I can be wrong.
r

Robin Moffatt

09/16/2022, 10:13 AM
got it. thanks.