Executing a word-count like program - this is the simplest scenario to trace down the problem - against an Euphoria's Flink Streaming executor using TimeSliding
windowing, reveals a huge shuffle overhead compared to an implementation of the same program using Flink's native Java API.
Experiment characteristics
- Apart from the key, the actual "values" being shuffled (as intended by the programmer) are actually small (in both cases):
- On native flink: it's a word (string)
- On euphoria: it's just a long (the value to be reduced)
- TimeSliding; the more windows the bigger Euphoria's overhead
Problem
Euphoria tries to implement a strategy such that the size of the actual values being shuffled gets reduced. This is, instead of shuffling RBK's (and RSBK's) original input elements, before the shuffle phase it extracts the "values" (to be later reduced) and sends those for reduction (instead of the original input elements). The reasoning behind this choice is to send the typically smaller values instead of the bigger input elements. However, due to windowing being applied to the operator's original input elements, it also needs to extract the original input elements' windows before sending them together with the value across the wire. The more windows there are for a single value, the bigger the overhead of data being shuffled.
The more data is being shuffled, the quicker network buffers get filled, which can lead to back pressure, thereby making the consumer side go slower. (To mitigate against back pressure, we can resize the network buffers at the cost of memory consumption.)
Our implementation of the idea to shuffle only values instead of the original input elements suffers under the following items:
- If the "value" being shuffled isn't smaller than or equal size as the original input element, we're immediately introducing unnecessary overhead.
- If the "value" being shuffled plus the necessary window information for the element isn't smaller than or equals size as the original input element, we're introducing unnecessary overhead. (note: this problem is not relevant when considering a partitioning schema by "window and key"; this is a potential but not-yet-implemented/explored reduce-by-key strategy.)
- Users of the Euphoria API have no chance to express a preference for a particular strategy in regards to the shuffle.
What's to do next
Conclusion
It is unclear whether the strategy currently implemented by Euphoria's streaming Flink executor is sufficient for most common use-cases. At best, we can say the strategy is not flexible to support use-cases where the original input element's size is not significantly different that the "value" being reduced. Further, from an API perspective users are not able to optimize for such use-cases (whereas users can optimize the opposite use-case in the Flink API - basically users of the Flink API can implement what Euphoria's streaming Flink executor does.)
Last but not least, the problem of this "reduce-by-key" strategy as detailed above, explains a significant part of the overhead measured under #14, which uses a benchmark with a "word-count" like reduction using TimeSliding
.