Comments (10)
It sounds like per-segment watermarks are a way to go there. That would mean that we need a somewhat lower level reader interface, though.
I am wondering what Pravega's big picture plan on time and order handling are. So far, there is only order by key, but any elaborate event time handling needs some more ordering, at at least information about the out-of-order-ness bounds.
from flink-connectors.
There are a number of things to be mentioned here. I hope this comment doesn't get too long.
The main abstraction that Pravega exposes is stream. Streams internally are implemented with segments and composing segments gives us a stream. It is useful to think of Pravega as a segment store, where each segment is a sequence of bytes organized according to temporal order. The core of Pravega does not really understands events, only bytes.
In principle, we can compose segments in any way we want, but we have chosen a few ways to compose them that make sense to the application, for example, scale up corresponds to ending a segment and starting n others. Although this idea of composing segments is really powerful, it brings a fair level of complexity when it comes to reasoning about order. For that reason, we have been avoiding at all costs to expose the segment abstraction. I also feel that manipulating segments directly leads to designs that have some rigid structure that I'd like to avoid, e.g., the mapping of partitions to sources is static.
We have talked about a low-level reader API because we want the ability of writing multiple flavors of reader group rather than cram all possible features within one reader group class. But, simply exposing all the segment manipulation would probably be a bad idea because it would lead to confusion and bugs. The API should be minimal, yet powerful enough to enable the different flavors.
As for time tracking, we could offer time references without exposing the notion of segments. We could do it in a way that is assigned by applications or at the AppendProcessor
level at the segment store. My main concern here is having to guarantee monotonicity at the segment store when the application decides the timestamps. As a first cut, we don't really want the segment store service itself interpreting the bytes of a segment for anything. Would it be sufficient if the writers added app-provided timestamps on a per segment basis, even if the timestamps aren't synchronized across writers?
In the A B C D example above, let's assume two cases: single reader, multiple readers. For the single reader case, I'd expect the reader to pick hit the end of B before it finishes reading A. The reader would handle the sealed segment response and would request the successors from the controller. The reader next adds the successors to the state of the reader group, and it will pick them up because there is no one else. The case of multiple readers is pretty much the same, except that other readers might end up picking it up.
Prioritizing unassigned segments by their start time would make sense in the case partitions sit there waiting for some time for some reader to pick them up. I believe we try to have them assigned as soon as possible, and so if we are doing a good job at it, then we don't really need to do that prioritization.
I think suggestion 1 and 3 are pretty much the same, but perhaps I'm missing the difference.
from flink-connectors.
This long comment is my way of saying that the Pravega connector (also the Kinesis connector) faces new challenges, and that we shouldn't assume that historical stream processing 'just works' with the wonderful determinism we've come to expect from Flink. I am not quite the expert that other folks here are, these are just my observations.
I find it useful to imagine there being two 'regimes' in stream processing. Consider a job started against some pre-existing stream. Processing begins with the oldest records, working thru them as quickly as possible to reach the tail. This is the 'historical' regime. Once at the tail, the processor enters a 'real-time' regime. There are important differences. In the real-time regime, the various time domains are roughly aligned, elements are processed roughly in the order they're received in, and the logic of watermark assignment can reasonably be delegated to the application. In the historical regime, there's an abundance of old data to process, system internals (like the implementation details in the reader group) heavily influence the processing order, and watermark assignment becomes more complex. Unless the connector is designed with historical processing in mind, the net effect is a loss of determinism, late records, etc.
Having per-partition assigners, in combination with watermark aggregation logic within the source and elsewhere, seems to be a good approach to decouple the app-specific aspects of assignment from the system-dependent aspects.
I feel the issue of non-determinism in historical processing is most acute in combination with dynamic work rebalancing and/or dynamic partitioning. Take the Kafka connector - each reader has a fixed set of partitions to process, and so the min-watermark calculation is rational. Throw in dynamic partitioning or rebalancing, and the possibility of late records is greatly increased.
from flink-connectors.
I hope to find some time in the next days to give respond with some more details, but for now here is a summary:
- There is from the use case perspective an incentive to make event time work.
- Event time support does not strictly need order, but a way to put a bound on out-of-orderness.
- I understand Flavio's desire to not put ordering into the abstraction of Pravega's readers and I think that he is right about that
- In many parts, Flink cannot assume order either. Network shuffles mess up order.
- Having a mechanism to compensate for that (buffering plus watermarks) is a great way to go from non-deterministic orders and out-of-orderness to deterministic results
- We may not end up using the same mechanism in Pravega and the Reader, but we could think along similar lines.
- Processing historic data should work in Flink in the future without watermarks and out of order bounds. You only need the watermark of the point where you switch from the historic data to the realtime data (when you catch up with the tail of the log). In between, it does not matter. Order may be completely different than in the original run.
from flink-connectors.
For me, a key question is the semantics of the application-specific watermark generation function. Ideally the function conveys domain knowledge (e.g. IOT specifics) without knowing the system internals (e.g. how the reader group selects segments). Meanwhile, the watermark produced by the function is aggregated by the system at various levels, e.g. across subtask instances and across partitions in the Kafka case.
We have considerable flexibility in how the Pravega source invokes the watermark function and performs aggregation. Could we somehow simplify the app developer's burden of writing a correct watermark function? Here's a few hypothetical solutions with diagrams. Note the function-like syntax W(...)
, which means a stateful instance of a watermark function.
First, the data we'll be working with. Three keys (A,B,C
) with six segments (S1..S6
). In this example, the producer is well-behaved, producing ascending timestamps per key.
┌────┬───┬───────┐ ┌────┐┌────┐┌────┐┌────┐
│1 │2 │4 │ A │ A1 ││ A2 ││ A3 ││ .. │
│ │ │ A │ └────┘└────┘└────┘└────┘
│ │ A ├───┬───┤ ┌────┐┌────┐┌────┐┌────┐
│ A │ B │5 │6 │ B │ B1 ││ B2 ││ B3 ││ .. │
│ B │ │ B │ │ └────┘└────┘└────┘└────┘
│ C ├───┴───┤ B │ ┌────┐┌────┐┌────┐┌────┐
│ │3 │ C │ C │ C1 ││ C2 ││ C3 ││ .. │
│ │ C │ │ └────┘└────┘└────┘└────┘
└────┴───────┴───┘
1) Watermark function per routing key. This is an ideal solution because it matches Pravega's natural ordering guarantee and hides all system internals. In this example, the app could simply use the predefined AscendingTimestampExtractor
function.
┌────┐┌────┐┌────┐┌────┐
W(│ A1 ││ A2 ││ A3 ││ .. │)
└────┘└────┘└────┘└────┘
┌────┐┌────┐┌────┐┌────┐
W(│ B1 ││ B2 ││ B3 ││ .. │)
└────┘└────┘└────┘└────┘
┌────┐┌────┐┌────┐┌────┐
W(│ C1 ││ C2 ││ C3 ││ .. │)
└────┘└────┘└────┘└────┘
2) Watermark function per segment. Here, the function must deal with ordering across keys, but may be written as though the stream consists of one fixed segment.
┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐
W(│ A1 ││ B1 ││ A2 ││ B2 ││ B3 ││ C1 ││ C2 ││ A3 ││ .. │)
└────┘└────┘└────┘└────┘└────┘└────┘└────┘└────┘└────┘
3) Watermark function per sub-task. Here, the function must deal with ordering across keys and with ordering of segment processing. This diagram reflects the current implementation.
╔════╗╔════╗╔════╗╔════╗
W(║ S1 ║║ S2 ║║ S5 ║║ .. ║)
╚════╝╚════╝╚════╝╚════╝
╔════╗╔════╗╔════╗╔════╗╔════╗
W(║ - ║║ S3 ║║ S6 ║║ S4 ║║ .. ║)
╚════╝╚════╝╚════╝╚════╝╚════╝
For reference, here's the source for commonly-used watermark functions:
from flink-connectors.
In the above examples do the w() groupings also corispond to the data received by one reader?
Sections 1) and 2) use A1, B1, C3 etc to label events, 3) uses S1-S5 do these refer to segments? If so, the order of segments listed would violate ordering guarantees, and that aside this is not the way the code works now.
You did not mention having a watermark function per stream or per reader group. These are also options.
However at the end of the day I think we are most interested in event time and not write time. In which case the only extractor that really makes sense is BoundedOutOfOrdernessTimestampExtractor as we don't control event time.
from flink-connectors.
Sorry, diagram (3) is not very clear. It illustrates only the order of segment processing as experienced by a particular (stateful) watermark function instance. It does not imply that S5 and S6 would be processed concurrently. I stand by my assertion that a subtask could process S6 before S4.
from flink-connectors.
The client is not returning segments. So a segment is never 'processed'. If you mean that some events in segment 4 could go after some events in segment 6, then yes. That shouldn't be surprising. After all they are in totally independent parts of the routing key space, and the events were written in parallel, it is only natural that they would be read in parallel. It is also true that it is possible that in the above example that A1, A2, A3 will all be read before C1, or all after. The contract is ordering by keys. The segments are an irrelevant implementation detail. There is no way to know they exist at an API level. Hence the Flink connector should not know or care about their existence or structure.
from flink-connectors.
I think that Tom is rightly concerned about keeping segments as an implementation detail. The reason they are mentioned is because segments affect processing order in today's reader group implementation. Sorry for any undue implications about exposing segments to users.
from flink-connectors.
Just a note, there's improved support for tracking a global watermark at the Flink level, based on a general-purpose 'global aggregate tracker' provided by the JM. Might be useful.
https://issues.apache.org/jira/browse/FLINK-10886
from flink-connectors.
Related Issues (20)
- Update Pravega and schema registry docker image version
- Pravega container fails to connect with segment store under containerized testing environment
- Unable to build connector via Gradle HOT 1
- Upgrade Flink 1.16
- Reduce TableSchema usages in Table API connectors
- Implement metrics for the Sink API HOT 2
- Use JacksonMapperFactory for json serialization
- Support protobuf format in Pravega catalog HOT 1
- Use `DeliveryGuarantee` enum as sink configuration HOT 2
- Update the build tag on Readme
- Add more debug log for writer initialization
- master-branch-connector ready for use? HOT 4
- Let Pravega internal thread pool to deal with checkpoint
- Explicitly cancel outstanding checkpoints when checkpoints get stuck
- Upgrade Flink 1.17
- Update connector version to 0.14.0-SNAPSHOT
- Update connector version to 0.13.1-SNAPSHOT
- Upgrade to flink 1.18.1
- Upgrade to Flink 1.19
- Create a document github.io website
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from flink-connectors.