Giter Club home page Giter Club logo

Comments (10)

StephanEwen avatar StephanEwen commented on September 27, 2024

It sounds like per-segment watermarks are a way to go there. That would mean that we need a somewhat lower level reader interface, though.

I am wondering what Pravega's big picture plan on time and order handling are. So far, there is only order by key, but any elaborate event time handling needs some more ordering, at at least information about the out-of-order-ness bounds.

from flink-connectors.

fpj avatar fpj commented on September 27, 2024

There are a number of things to be mentioned here. I hope this comment doesn't get too long.

The main abstraction that Pravega exposes is stream. Streams internally are implemented with segments and composing segments gives us a stream. It is useful to think of Pravega as a segment store, where each segment is a sequence of bytes organized according to temporal order. The core of Pravega does not really understands events, only bytes.

In principle, we can compose segments in any way we want, but we have chosen a few ways to compose them that make sense to the application, for example, scale up corresponds to ending a segment and starting n others. Although this idea of composing segments is really powerful, it brings a fair level of complexity when it comes to reasoning about order. For that reason, we have been avoiding at all costs to expose the segment abstraction. I also feel that manipulating segments directly leads to designs that have some rigid structure that I'd like to avoid, e.g., the mapping of partitions to sources is static.

We have talked about a low-level reader API because we want the ability of writing multiple flavors of reader group rather than cram all possible features within one reader group class. But, simply exposing all the segment manipulation would probably be a bad idea because it would lead to confusion and bugs. The API should be minimal, yet powerful enough to enable the different flavors.

As for time tracking, we could offer time references without exposing the notion of segments. We could do it in a way that is assigned by applications or at the AppendProcessor level at the segment store. My main concern here is having to guarantee monotonicity at the segment store when the application decides the timestamps. As a first cut, we don't really want the segment store service itself interpreting the bytes of a segment for anything. Would it be sufficient if the writers added app-provided timestamps on a per segment basis, even if the timestamps aren't synchronized across writers?

In the A B C D example above, let's assume two cases: single reader, multiple readers. For the single reader case, I'd expect the reader to pick hit the end of B before it finishes reading A. The reader would handle the sealed segment response and would request the successors from the controller. The reader next adds the successors to the state of the reader group, and it will pick them up because there is no one else. The case of multiple readers is pretty much the same, except that other readers might end up picking it up.

Prioritizing unassigned segments by their start time would make sense in the case partitions sit there waiting for some time for some reader to pick them up. I believe we try to have them assigned as soon as possible, and so if we are doing a good job at it, then we don't really need to do that prioritization.

I think suggestion 1 and 3 are pretty much the same, but perhaps I'm missing the difference.

from flink-connectors.

EronWright avatar EronWright commented on September 27, 2024

This long comment is my way of saying that the Pravega connector (also the Kinesis connector) faces new challenges, and that we shouldn't assume that historical stream processing 'just works' with the wonderful determinism we've come to expect from Flink. I am not quite the expert that other folks here are, these are just my observations.

I find it useful to imagine there being two 'regimes' in stream processing. Consider a job started against some pre-existing stream. Processing begins with the oldest records, working thru them as quickly as possible to reach the tail. This is the 'historical' regime. Once at the tail, the processor enters a 'real-time' regime. There are important differences. In the real-time regime, the various time domains are roughly aligned, elements are processed roughly in the order they're received in, and the logic of watermark assignment can reasonably be delegated to the application. In the historical regime, there's an abundance of old data to process, system internals (like the implementation details in the reader group) heavily influence the processing order, and watermark assignment becomes more complex. Unless the connector is designed with historical processing in mind, the net effect is a loss of determinism, late records, etc.

Having per-partition assigners, in combination with watermark aggregation logic within the source and elsewhere, seems to be a good approach to decouple the app-specific aspects of assignment from the system-dependent aspects.

I feel the issue of non-determinism in historical processing is most acute in combination with dynamic work rebalancing and/or dynamic partitioning. Take the Kafka connector - each reader has a fixed set of partitions to process, and so the min-watermark calculation is rational. Throw in dynamic partitioning or rebalancing, and the possibility of late records is greatly increased.

from flink-connectors.

StephanEwen avatar StephanEwen commented on September 27, 2024

I hope to find some time in the next days to give respond with some more details, but for now here is a summary:

  • There is from the use case perspective an incentive to make event time work.
  • Event time support does not strictly need order, but a way to put a bound on out-of-orderness.
  • I understand Flavio's desire to not put ordering into the abstraction of Pravega's readers and I think that he is right about that
  • In many parts, Flink cannot assume order either. Network shuffles mess up order.
  • Having a mechanism to compensate for that (buffering plus watermarks) is a great way to go from non-deterministic orders and out-of-orderness to deterministic results
  • We may not end up using the same mechanism in Pravega and the Reader, but we could think along similar lines.
  • Processing historic data should work in Flink in the future without watermarks and out of order bounds. You only need the watermark of the point where you switch from the historic data to the realtime data (when you catch up with the tail of the log). In between, it does not matter. Order may be completely different than in the original run.

from flink-connectors.

EronWright avatar EronWright commented on September 27, 2024

For me, a key question is the semantics of the application-specific watermark generation function. Ideally the function conveys domain knowledge (e.g. IOT specifics) without knowing the system internals (e.g. how the reader group selects segments). Meanwhile, the watermark produced by the function is aggregated by the system at various levels, e.g. across subtask instances and across partitions in the Kafka case.

We have considerable flexibility in how the Pravega source invokes the watermark function and performs aggregation. Could we somehow simplify the app developer's burden of writing a correct watermark function? Here's a few hypothetical solutions with diagrams. Note the function-like syntax W(...), which means a stateful instance of a watermark function.

First, the data we'll be working with. Three keys (A,B,C) with six segments (S1..S6). In this example, the producer is well-behaved, producing ascending timestamps per key.

┌────┬───┬───────┐     ┌────┐┌────┐┌────┐┌────┐
│1   │2  │4      │   A │ A1 ││ A2 ││ A3 ││ .. │
│    │   │ A     │     └────┘└────┘└────┘└────┘
│    │ A ├───┬───┤     ┌────┐┌────┐┌────┐┌────┐
│ A  │ B │5  │6  │   B │ B1 ││ B2 ││ B3 ││ .. │
│ B  │   │ B │   │     └────┘└────┘└────┘└────┘
│ C  ├───┴───┤ B │     ┌────┐┌────┐┌────┐┌────┐
│    │3      │ C │   C │ C1 ││ C2 ││ C3 ││ .. │
│    │ C     │   │     └────┘└────┘└────┘└────┘
└────┴───────┴───┘                             

1) Watermark function per routing key. This is an ideal solution because it matches Pravega's natural ordering guarantee and hides all system internals. In this example, the app could simply use the predefined AscendingTimestampExtractor function.

    ┌────┐┌────┐┌────┐┌────┐  
  W(│ A1 ││ A2 ││ A3 ││ .. │) 
    └────┘└────┘└────┘└────┘  
    ┌────┐┌────┐┌────┐┌────┐  
  W(│ B1 ││ B2 ││ B3 ││ .. │) 
    └────┘└────┘└────┘└────┘  
    ┌────┐┌────┐┌────┐┌────┐  
  W(│ C1 ││ C2 ││ C3 ││ .. │) 
    └────┘└────┘└────┘└────┘  

2) Watermark function per segment. Here, the function must deal with ordering across keys, but may be written as though the stream consists of one fixed segment.

   ┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐  
 W(│ A1 ││ B1 ││ A2 ││ B2 ││ B3 ││ C1 ││ C2 ││ A3 ││ .. │) 
   └────┘└────┘└────┘└────┘└────┘└────┘└────┘└────┘└────┘  

3) Watermark function per sub-task. Here, the function must deal with ordering across keys and with ordering of segment processing. This diagram reflects the current implementation.

   ╔════╗╔════╗╔════╗╔════╗       
 W(║ S1 ║║ S2 ║║ S5 ║║ .. ║)      
   ╚════╝╚════╝╚════╝╚════╝       
   ╔════╗╔════╗╔════╗╔════╗╔════╗ 
 W(║ -  ║║ S3 ║║ S6 ║║ S4 ║║ .. ║)
   ╚════╝╚════╝╚════╝╚════╝╚════╝ 

For reference, here's the source for commonly-used watermark functions:

from flink-connectors.

tkaitchuck avatar tkaitchuck commented on September 27, 2024

In the above examples do the w() groupings also corispond to the data received by one reader?

Sections 1) and 2) use A1, B1, C3 etc to label events, 3) uses S1-S5 do these refer to segments? If so, the order of segments listed would violate ordering guarantees, and that aside this is not the way the code works now.
You did not mention having a watermark function per stream or per reader group. These are also options.
However at the end of the day I think we are most interested in event time and not write time. In which case the only extractor that really makes sense is BoundedOutOfOrdernessTimestampExtractor as we don't control event time.

from flink-connectors.

EronWright avatar EronWright commented on September 27, 2024

Sorry, diagram (3) is not very clear. It illustrates only the order of segment processing as experienced by a particular (stateful) watermark function instance. It does not imply that S5 and S6 would be processed concurrently. I stand by my assertion that a subtask could process S6 before S4.

from flink-connectors.

tkaitchuck avatar tkaitchuck commented on September 27, 2024

The client is not returning segments. So a segment is never 'processed'. If you mean that some events in segment 4 could go after some events in segment 6, then yes. That shouldn't be surprising. After all they are in totally independent parts of the routing key space, and the events were written in parallel, it is only natural that they would be read in parallel. It is also true that it is possible that in the above example that A1, A2, A3 will all be read before C1, or all after. The contract is ordering by keys. The segments are an irrelevant implementation detail. There is no way to know they exist at an API level. Hence the Flink connector should not know or care about their existence or structure.

from flink-connectors.

EronWright avatar EronWright commented on September 27, 2024

I think that Tom is rightly concerned about keeping segments as an implementation detail. The reason they are mentioned is because segments affect processing order in today's reader group implementation. Sorry for any undue implications about exposing segments to users.

from flink-connectors.

EronWright avatar EronWright commented on September 27, 2024

Just a note, there's improved support for tracking a global watermark at the Flink level, based on a general-purpose 'global aggregate tracker' provided by the JM. Might be useful.
https://issues.apache.org/jira/browse/FLINK-10886

from flink-connectors.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.