Giter Club home page Giter Club logo

Comments (16)

tdas avatar tdas commented on July 3, 2024 6

First of all, thank you very much for trying out Delta Lake! The current version (0.1.0) has a very restrictive conflict detection check to be absolutely safe. In future releases, we will slowly relax the conflict criteria to allow more concurrency while ensuring the ACID guarantees. Hopefully, we will be able to make such workloads easier.

from delta.

tdas avatar tdas commented on July 3, 2024 6

We have improved our concurrency control in this commit - f328300

This allows operations on disjoint partitions to be concurrently written.

from delta.

tdas avatar tdas commented on July 3, 2024 2

Delta Lake currently only works with HDFS with full guarantees because HDFS provides the necessary file system operation guarantees that give Delta Lake its consistency guarantee. S3 file system does not provide those guarantees yet, primarily because S3 does not provide list-after-write guarantees.
Details on the required guarantees - https://github.com/delta-io/delta#transaction-protocol

PS: This is a completely different issue that the original issue in this thread. Please make a different issue for this.

from delta.

liwensun avatar liwensun commented on July 3, 2024 2

Thanks for sharing your use case and the great discussions. I have created a concurrency support tracking issue which references to this issue so people can see the use cases and discussions here.

from delta.

koertkuipers avatar koertkuipers commented on July 3, 2024 2

i am not sure it is straightforward to safely allow concurrent writes that replace partitions. optimistic transaction seems to know what files were added or deleted, but thats not the same as knowing what the intent was of the transaction.

for example a transaction might have had the intent to replace everything where say a=1, so replaceWhere("a=1"), and let say there was nothing to delete and it only wrote out only to a=1/b=1/part.snappy
if another transaction ran at same and also had a replaceWhere("a=1") and also deleted nothing but created a file a=1/b=2/part.snappy, then by just looking at the file actions they do not seem to be in conflict, but they are.

from delta.

hackmad avatar hackmad commented on July 3, 2024 1

I can appreciate the challenges in designing something like this. However, it basically makes it so that existing processes that can use Parquet format to simultaneously load partitions cannot be converted over to using Delta. It essentially serializes all stages that could be run in parallel. It might be worth having an option to load all data and then update partition information in the metastore. Similar to how you would have to do in Athena. If a new pipeline is created, we would have to workaround this by first loading the secondary parition level data into their own S3 locations without partitioning and then later organize them. This would still have the additional overhead of addtional storage (which could be mitigated with retention policies in S3) but more importantly more than doubling the compute cost to process the data a second time.

from delta.

hospadar avatar hospadar commented on July 3, 2024 1

Wanted to add to this - this would be a blocker for us as well to switch from parquet to delta.

Right now we store our underlying data in s3 as parquet and do management of partitions fairly manually to keep tables in a happy state. We always write out new (or replacement) partitions to a new folder then just swap the location of the partition in the metastore to make it look like an atomic update to anyone querying the data downstream (also allows us to theoretically roll back an update, although that's impractical for us and requires log spelunking to find the old paths).

We often do big backfill/reprocessing jobs where we process tons of dates in parallel to keep the cluster over-committed. If we could only write one partition at once our throughput would slow down quite a bit on jobs like this.

I'd love to switch to delta, it would make it MUCH easier to revert data to earlier states (and a variety of other things would become more convenient for us), but this issue is probably a blocker.

Our logic goes something like:

///// First thread is doing something like:
String path = "s3://warehouse/" + UUID.randomUUID().toString()
dataframe.write.parquet(path)
spark.sql("ALTER TABLE target PARTITION (dt='2019-01-01') SET LOCATION '" + path + "'")

/////Another thread doing the same thing, but for a different date
String path2 = "s3://warehouse/" + UUID.randomUUID().toString()
dataframe.write.parquet(path2)
//register second datframe to a different partition
spark.sql("ALTER TABLE target PARTITION (dt='2019-01-02') SET LOCATION '" + path2 + "'")

from delta.

wernerdaehn avatar wernerdaehn commented on July 3, 2024

I would assume this limitation is because Delta supports ACID within a table. If you have two sessions writing into different partitions, this would need a different transaction handling compared to the situation where a single writer writes into all partitions.
Might be harder to implement than it looks at first sight.

Having said that, I would love to have such option as well. There will be situations where you need mass data loads and would be okay with a relaxed transaction guarantee. And there will be situations where transaction guarantees are more important than mass data performance.

my2cents

from delta.

ligao101 avatar ligao101 commented on July 3, 2024

hello I am looking into how delta would work on s3 based data lake. what are the current limitations of running the current delta lake on s3? This issue appears to be one of the potential limitations we could see. Thanks!

from delta.

koertkuipers avatar koertkuipers commented on July 3, 2024

@calvinlfer i agree that concurrent writes to totally separate partitions would be great.

however i was surprised to hear you say parquet supports this just fine. we have run into issues with this using dynamic partitionOverwriteMode and partitionBy, because both writers try to create temporary files in baseDir/_temporary, leading to weird errors when one finishes and deletes _temporary while the other job is still running. just FYI.

from delta.

koertkuipers avatar koertkuipers commented on July 3, 2024

note that for the example of dynamic partition overwrite (which is not in delta but we added on our own branch) it is easy to reason about, because the files deleted are always in exact same partitions as where files are added, so you only need to check for conflicts with respect to added files (e.g. verify the transactions did not write to exact same partitions).

from delta.

calvinlfer avatar calvinlfer commented on July 3, 2024

Hey @koertkuipers what version and flavor of Spark are you using? @hackmad and myself have seen this work at scale on the Databricks platform with Spark 2.4.1

from delta.

koertkuipers avatar koertkuipers commented on July 3, 2024

@calvinlfer we use spark 2.4.1 on hadoop 2.7
however i am a little uncertain if that's the version we observed the issue with or if it was an earlier version and we have avoided the situation ever since. i remember the errors being hdfs lease exceptions because one job would delete the _temporary directory while the other was still using it.

from delta.

koertkuipers avatar koertkuipers commented on July 3, 2024

@calvinlfer maybe things changed for the better. i now see when i run two jobs writing to different partitions using partitionBy and dynamic partitionOverwriteMode:

drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:17 out/_temporary/0

so it seems each job has its own .spark-staging directory, and _temporary isnt really used? not sure...

from delta.

calvinlfer avatar calvinlfer commented on July 3, 2024

Sorry I should have mentioned this more explicitly earlier but we used S3 instead of HDFS so I believe the underlying implementation is quite different and allows for concurrent writes to non conflicting partitions

from delta.

koertkuipers avatar koertkuipers commented on July 3, 2024

@calvinlfer i did some more checking and the issue of writers conflicting with each other when writing to same baseDir with dynamic partition overwrite does still exist in spark 2.4 and spark master for all file sources. i cannot say anything about writing to s3, that could be very different.
for more info please see (and vote for):
https://issues.apache.org/jira/browse/SPARK-28945

from delta.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.