Giter Club home page Giter Club logo

reactive-streams-io's Introduction

Reactive Streams

The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.

The latest release is available on Maven Central as

<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams</artifactId>
  <version>1.0.4</version>
</dependency>
<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams-tck</artifactId>
  <version>1.0.4</version>
  <scope>test</scope>
</dependency>

Goals, Design and Scope

Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – think passing elements on to another thread or thread-pool — while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the backpressure signals were synchronous (see also the Reactive Manifesto), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.

It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.

It should be noted that the precise nature of stream manipulations (transformation, splitting, merging, etc.) is not covered by this specification. Reactive Streams are only concerned with mediating the stream of data between different API Components. In their development care has been taken to ensure that all basic ways of combining streams can be expressed.

In summary, Reactive Streams is a standard and specification for Stream-oriented libraries for the JVM that

  • process a potentially unbounded number of elements
  • in sequence,
  • asynchronously passing elements between components,
  • with mandatory non-blocking backpressure.

The Reactive Streams specification consists of the following parts:

The API specifies the types to implement Reactive Streams and achieve interoperability between different implementations.

The Technology Compatibility Kit (TCK) is a standard test suite for conformance testing of implementations.

Implementations are free to implement additional features not covered by the specification as long as they conform to the API requirements and pass the tests in the TCK.

API Components

The API consists of the following components that are required to be provided by Reactive Stream implementations:

  1. Publisher
  2. Subscriber
  3. Subscription
  4. Processor

A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).

In response to a call to Publisher.subscribe(Subscriber) the possible invocation sequences for methods on the Subscriber are given by the following protocol:

onSubscribe onNext* (onError | onComplete)?

This means that onSubscribe is always signalled, followed by a possibly unbounded number of onNext signals (as requested by Subscriber) followed by an onError signal if there is a failure, or an onComplete signal when no more elements are available—all as long as the Subscription is not cancelled.

NOTES

Glossary

Term Definition
Signal As a noun: one of the onSubscribe, onNext, onComplete, onError, request(n) or cancel methods. As a verb: calling/invoking a signal.
Demand As a noun, the aggregated number of elements requested by a Subscriber which is yet to be delivered (fulfilled) by the Publisher. As a verb, the act of request-ing more elements.
Synchronous(ly) Executes on the calling Thread.
Return normally Only ever returns a value of the declared type to the caller. The only legal way to signal failure to a Subscriber is via the onError method.
Responsivity Readiness/ability to respond. In this document used to indicate that the different components should not impair each others ability to respond.
Non-obstructing Quality describing a method which is as quick to execute as possible—on the calling thread. This means, for example, avoids heavy computations and other things that would stall the caller´s thread of execution.
Terminal state For a Publisher: When onComplete or onError has been signalled. For a Subscriber: When an onComplete or onError has been received.
NOP Execution that has no detectable effect to the calling thread, and can as such safely be called any number of times.
Serial(ly) In the context of a Signal, non-overlapping. In the context of the JVM, calls to methods on an object are serial if and only if there is a happens-before relationship between those calls (implying also that the calls do not overlap). When the calls are performed asynchronously, coordination to establish the happens-before relationship is to be implemented using techniques such as, but not limited to, atomics, monitors, or locks.
Thread-safe Can be safely invoked synchronously, or asychronously, without requiring external synchronization to ensure program correctness.

SPECIFICATION

1. Publisher (Code)

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
ID Rule
1 The total number of onNext´s signalled by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.
💡 The intent of this rule is to make it clear that Publishers cannot signal more elements than Subscribers have requested. There’s an implicit, but important, consequence to this rule: Since demand can only be fulfilled after it has been received, there’s a happens-before relationship between requesting elements and receiving elements.
2 A Publisher MAY signal fewer onNext than requested and terminate the Subscription by calling onComplete or onError.
💡 The intent of this rule is to make it clear that a Publisher cannot guarantee that it will be able to produce the number of elements requested; it simply might not be able to produce them all; it may be in a failed state; it may be empty or otherwise already completed.
3 onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
💡 The intent of this rule is to permit the signalling of signals (including from multiple threads) if and only if a happens-before relation between each of the signals is established.
4 If a Publisher fails it MUST signal an onError.
💡 The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers if it detects that it cannot proceed—Subscribers must be given a chance to clean up resources or otherwise deal with the Publisher´s failures.
5 If a Publisher terminates successfully (finite stream) it MUST signal an onComplete.
💡 The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers that it has reached a terminal state—Subscribers can then act on this information; clean up resources, etc.
6 If a Publisher signals either onError or onComplete on a Subscriber, that Subscriber’s Subscription MUST be considered cancelled.
💡 The intent of this rule is to make sure that a Subscription is treated the same no matter if it was cancelled, the Publisher signalled onError or onComplete.
7 Once a terminal state has been signaled (onError, onComplete) it is REQUIRED that no further signals occur.
💡 The intent of this rule is to make sure that onError and onComplete are the final states of an interaction between a Publisher and Subscriber pair.
8 If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.
💡 The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason for eventually is because signals can have propagation delay due to being asynchronous.
9 Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber and MUST return normally, except when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe).
💡 The intent of this rule is to make sure that onSubscribe is always signalled before any of the other signals, so that initialization logic can be executed by the Subscriber when the signal is received. Also onSubscribe MUST only be called at most once, [see 2.12]. If the supplied Subscriber is null, there is nowhere else to signal this but to the caller, which means a java.lang.NullPointerException must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a terminal state.
10 Publisher.subscribe MAY be called as many times as wanted but MUST be with a different Subscriber each time [see 2.12].
💡 The intent of this rule is to have callers of subscribe be aware that a generic Publisher and a generic Subscriber cannot be assumed to support being attached multiple times. Furthermore, it also mandates that the semantics of subscribe must be upheld no matter how many times it is called.
11 A Publisher MAY support multiple Subscribers and decides whether each Subscription is unicast or multicast.
💡 The intent of this rule is to give Publisher implementations the flexibility to decide how many, if any, Subscribers they will support, and how elements are going to be distributed.

2. Subscriber (Code)

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
ID Rule
1 A Subscriber MUST signal demand via Subscription.request(long n) to receive onNext signals.
💡 The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient "stop-and-wait" protocol.
2 If a Subscriber suspects that its processing of signals will negatively impact its Publisher´s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals.
💡 The intent of this rule is that a Subscriber should not obstruct the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from receiving CPU cycles.
3 Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher.
💡 The intent of this rule is to prevent cycles and race-conditions—between Publisher, Subscription and Subscriber—during the processing of completion signals.
4 Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST consider the Subscription cancelled after having received the signal.
💡 The intent of this rule is to make sure that Subscribers respect a Publisher’s terminal state signals. A Subscription is simply not valid anymore after an onComplete or onError signal has been received.
5 A Subscriber MUST call Subscription.cancel() on the given Subscription after an onSubscribe signal if it already has an active Subscription.
💡 The intent of this rule is to prevent that two, or more, separate Publishers from trying to interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled. Failure to conform to this rule may lead to violations of Publisher rule 1, amongst others. Such violations can lead to hard-to-diagnose bugs.
6 A Subscriber MUST call Subscription.cancel() if the Subscription is no longer needed.
💡 The intent of this rule is to establish that Subscribers cannot just throw Subscriptions away when they are no longer needed, they have to call cancel so that resources held by that Subscription can be safely, and timely, reclaimed. An example of this would be a Subscriber which is only interested in a specific element, which would then cancel its Subscription to signal its completion to the Publisher.
7 A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performed serially.
💡 The intent of this rule is to permit the calling of the request and cancel methods (including from multiple threads) if and only if a serial relation between each of the calls is established.
8 A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel() if there are still requested elements pending [see 3.12]. Subscription.cancel() does not guarantee to perform the underlying cleaning operations immediately.
💡 The intent of this rule is to highlight that there may be a delay between calling cancel and the Publisher observing that cancellation.
9 A Subscriber MUST be prepared to receive an onComplete signal with or without a preceding Subscription.request(long n) call.
💡 The intent of this rule is to establish that completion is unrelated to the demand flow—this allows for streams which complete early, and obviates the need to poll for completion.
10 A Subscriber MUST be prepared to receive an onError signal with or without a preceding Subscription.request(long n) call.
💡 The intent of this rule is to establish that Publisher failures may be completely unrelated to signalled demand. This means that Subscribers do not need to poll to find out if the Publisher will not be able to fulfill its requests.
11 A Subscriber MUST make sure that all calls on its signal methods happen-before the processing of the respective signals. I.e. the Subscriber must take care of properly publishing the signal to its processing logic.
💡 The intent of this rule is to establish that it is the responsibility of the Subscriber implementation to make sure that asynchronous processing of its signals are thread safe. See JMM definition of Happens-Before in section 17.4.5.
12 Subscriber.onSubscribe MUST be called at most once for a given Subscriber (based on object equality).
💡 The intent of this rule is to establish that it MUST be assumed that the same Subscriber can only be subscribed at most once. Note that object equality is a.equals(b).
13 Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.
💡 The intent of this rule is to establish the semantics for the methods of Subscriber and what the Publisher is allowed to do in which case this rule is violated. «Raise this error condition in a fashion that is adequate for the runtime environment» could mean logging the error—or otherwise make someone or something aware of the situation—as the error cannot be signalled to the faulty Subscriber.

3. Subscription (Code)

public interface Subscription {
    public void request(long n);
    public void cancel();
}
ID Rule
1 Subscription.request and Subscription.cancel MUST only be called inside of its Subscriber context.
💡 The intent of this rule is to establish that a Subscription represents the unique relationship between a Subscriber and a Publisher [see 2.12]. The Subscriber is in control over when elements are requested and when more elements are no longer needed.
2 The Subscription MUST allow the Subscriber to call Subscription.request synchronously from within onNext or onSubscribe.
💡 The intent of this rule is to make it clear that implementations of request must be reentrant, to avoid stack overflows in the case of mutual recursion between request and onNext (and eventually onComplete / onError). This implies that Publishers can be synchronous, i.e. signalling onNext´s on the thread which calls request.
3 Subscription.request MUST place an upper bound on possible synchronous recursion between Publisher and Subscriber.
💡 The intent of this rule is to complement [see 3.2] by placing an upper limit on the mutual recursion between request and onNext (and eventually onComplete / onError). Implementations are RECOMMENDED to limit this mutual recursion to a depth of 1 (ONE)—for the sake of conserving stack space. An example for undesirable synchronous, open recursion would be Subscriber.onNext -> Subscription.request -> Subscriber.onNext -> …, as it otherwise will result in blowing the calling thread´s stack.
4 Subscription.request SHOULD respect the responsivity of its caller by returning in a timely manner.
💡 The intent of this rule is to establish that request is intended to be a non-obstructing method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution.
5 Subscription.cancel MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be thread-safe.
💡 The intent of this rule is to establish that cancel is intended to be a non-obstructing method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution. Furthermore, it is also important that it is possible to call it multiple times without any adverse effects.
6 After the Subscription is cancelled, additional Subscription.request(long n) MUST be NOPs.
💡 The intent of this rule is to establish a causal relationship between cancellation of a subscription and the subsequent non-operation of requesting more elements.
7 After the Subscription is cancelled, additional Subscription.cancel() MUST be NOPs.
💡 The intent of this rule is superseded by 3.5.
8 While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.
💡 The intent of this rule is to make sure that request-ing is an additive operation, as well as ensuring that a request for elements is delivered to the Publisher.
9 While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a java.lang.IllegalArgumentException if the argument is <= 0. The cause message SHOULD explain that non-positive request signals are illegal.
💡 The intent of this rule is to prevent faulty implementations to proceed operation without any exceptions being raised. Requesting a negative or 0 number of elements, since requests are additive, most likely to be the result of an erroneous calculation on the behalf of the Subscriber.
10 While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onNext on this (or other) subscriber(s).
💡 The intent of this rule is to establish that it is allowed to create synchronous Publishers, i.e. Publishers who execute their logic on the calling thread.
11 While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onComplete or onError on this (or other) subscriber(s).
💡 The intent of this rule is to establish that it is allowed to create synchronous Publishers, i.e. Publishers who execute their logic on the calling thread.
12 While the Subscription is not cancelled, Subscription.cancel() MUST request the Publisher to eventually stop signaling its Subscriber. The operation is NOT REQUIRED to affect the Subscription immediately.
💡 The intent of this rule is to establish that the desire to cancel a Subscription is eventually respected by the Publisher, acknowledging that it may take some time before the signal is received.
13 While the Subscription is not cancelled, Subscription.cancel() MUST request the Publisher to eventually drop any references to the corresponding subscriber.
💡 The intent of this rule is to make sure that Subscribers can be properly garbage-collected after their subscription no longer being valid. Re-subscribing with the same Subscriber object is discouraged [see 2.12], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely.
14 While the Subscription is not cancelled, calling Subscription.cancel MAY cause the Publisher, if stateful, to transition into the shut-down state if no other Subscription exists at this point [see 1.9].
💡 The intent of this rule is to allow for Publishers to signal onComplete or onError following onSubscribe for new Subscribers in response to a cancellation signal from an existing Subscriber.
15 Calling Subscription.cancel MUST return normally.
💡 The intent of this rule is to disallow implementations to throw exceptions in response to cancel being called.
16 Calling Subscription.request MUST return normally.
💡 The intent of this rule is to disallow implementations to throw exceptions in response to request being called.
17 A Subscription MUST support an unbounded number of calls to request and MUST support a demand up to 2^63-1 (java.lang.Long.MAX_VALUE). A demand equal or greater than 2^63-1 (java.lang.Long.MAX_VALUE) MAY be considered by the Publisher as “effectively unbounded”.
💡 The intent of this rule is to establish that the Subscriber can request an unbounded number of elements, in any increment above 0 [see 3.9], in any number of invocations of request. As it is not feasibly reachable with current or foreseen hardware within a reasonable amount of time (1 element per nanosecond would take 292 years) to fulfill a demand of 2^63-1, it is allowed for a Publisher to stop tracking demand beyond this point.

A Subscription is shared by exactly one Publisher and one Subscriber for the purpose of mediating the data exchange between this pair. This is the reason why the subscribe() method does not return the created Subscription, but instead returns void; the Subscription is only passed to the Subscriber via the onSubscribe callback.

4.Processor (Code)

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
ID Rule
1 A Processor represents a processing stage—which is both a Subscriber and a Publisher and MUST obey the contracts of both.
💡 The intent of this rule is to establish that Processors behave, and are bound by, both the Publisher and Subscriber specifications.
2 A Processor MAY choose to recover an onError signal. If it chooses to do so, it MUST consider the Subscription cancelled, otherwise it MUST propagate the onError signal to its Subscribers immediately.
💡 The intent of this rule is to inform that it’s possible for implementations to be more than simple transformations.

While not mandated, it can be a good idea to cancel a Processor´s upstream Subscription when/if its last Subscriber cancels their Subscription, to let the cancellation signal propagate upstream.

Asynchronous vs Synchronous Processing

The Reactive Streams API prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) MUST NOT block the Publisher. However, each of the on* handlers can process the events synchronously or asynchronously.

Take this example:

nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput)

It has an async origin and an async destination. Let’s assume that both origin and destination are selector event loops. The Subscription.request(n) must be chained from the destination to the origin. This is now where each implementation can choose how to do this.

The following uses the pipe | character to signal async boundaries (queue and schedule) and R# to represent resources (possibly threads).

nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput)
-------------- R1 ----  | - R2 - | -- R3 --- | ---------- R4 ----------------

In this example each of the 3 consumers, map, filter and consumeTo asynchronously schedule the work. It could be on the same event loop (trampoline), separate threads, whatever.

nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput)
------------------- R1 ----------------- | ---------- R2 ----------------

Here it is only the final step that asynchronously schedules, by adding work to the NioSelectorOutput event loop. The map and filter steps are synchronously performed on the origin thread.

Or another implementation could fuse the operations to the final consumer:

nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput)
--------- R1 ---------- | ------------------ R2 -------------------------

All of these variants are "asynchronous streams". They all have their place and each has different tradeoffs including performance and implementation complexity.

The Reactive Streams contract allows implementations the flexibility to manage resources and scheduling and mix asynchronous and synchronous processing within the bounds of a non-blocking, asynchronous, dynamic push-pull stream.

In order to allow fully asynchronous implementations of all participating API elements—Publisher/Subscription/Subscriber/Processor—all methods defined by these interfaces return void.

Subscriber controlled queue bounds

One of the underlying design principles is that all buffer sizes are to be bounded and these bounds must be known and controlled by the subscribers. These bounds are expressed in terms of element count (which in turn translates to the invocation count of onNext). Any implementation that aims to support infinite streams (especially high output rate streams) needs to enforce bounds all along the way to avoid out-of-memory errors and constrain resource usage in general.

Since back-pressure is mandatory the use of unbounded buffers can be avoided. In general, the only time when a queue might grow without bounds is when the publisher side maintains a higher rate than the subscriber for an extended period of time, but this scenario is handled by backpressure instead.

Queue bounds can be controlled by a subscriber signaling demand for the appropriate number of elements. At any point in time the subscriber knows:

  • the total number of elements requested: P
  • the number of elements that have been processed: N

Then the maximum number of elements that may arrive—until more demand is signaled to the Publisher—is P - N. In the case that the subscriber also knows the number of elements B in its input buffer then this bound can be refined to P - B - N.

These bounds must be respected by a publisher independent of whether the source it represents can be backpressured or not. In the case of sources whose production rate cannot be influenced—for example clock ticks or mouse movement—the publisher must choose to either buffer or drop elements to obey the imposed bounds.

Subscribers signaling a demand for one element after the reception of an element effectively implement a Stop-and-Wait protocol where the demand signal is equivalent to acknowledgement. By providing demand for multiple elements the cost of acknowledgement is amortized. It is worth noting that the subscriber is allowed to signal demand at any point in time, allowing it to avoid unnecessary delays between the publisher and the subscriber (i.e. keeping its input buffer filled without having to wait for full round-trips).

Legal

This project is a collaboration between engineers from Kaazing, Lightbend, Netflix, Pivotal, Red Hat, Twitter and many others. This project is licensed under MIT No Attribution (SPDX: MIT-0).

reactive-streams-io's People

Contributors

benjchristensen avatar rkuhn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactive-streams-io's Issues

Goals and Motivation

Due to the successful collaborations on Reactive Streams for the JVM and community involvement in Reactive Extensions (RxJava and friends) I want to pursue greater polyglot support of the Reactive Stream semantics. This includes both language specific interfaces and over-the-network protocols. I propose that we collaborate as a community to achieve the use cases I list below, along with any others I'm missing that we derive together.

In full disclosure, personally I am building systems that need polyglot, stream-oriented network interaction, primarily between Java and JavaScript in the near future. I prefer to collaborate and design the solution openly rather than reinvent yet another competing solution. I am unsatisfied with current solutions or unaware of better ones. Teams at Netflix are creating custom one-off solutions based on ReactiveX/Reactive-Stream semantics and I'd prefer we not do this in isolation. Selfishly I want the input of people far better at this domain than I am since I am out of my league in defining network protocols and interfaces in non-Java languages. I also want to avoid NIH (not-invented-here) and solve these problems across organizations and companies since systems I'm building will most likely outlive my involvement in them and community support and involvement in core, foundational networking and messaging layers is far better than home grown solutions in the long run. I expect this to slow me down in the near term, but greatly accelerate and improve the medium and long-term accomplishments, and significantly improve the final outcome.

The timelines I'm hoping for would be functioning prototypes and protocol drafts in a few months, with release candidates in 6-9 months (Q3/Q4-2015) and a GA release in early 2016. I and the team I work with at Netflix intend on building our systems along these timelines to be proving out what we design here.

Additionally, I hope for collaboration across expertise domains to allow for debate, critiques, ideas and solutions that would not occur while staying in our individual silos.

Use Cases

The intent is to enable Reactive Stream semantics for async, stream-oriented IO supporting backpressure and cancelation.

On top of protocols such as TCP, WebSockets and possibly HTTP/2 it would allow bi-directional, multiplexed communication for these semantics:

  • subscribe, request(n), cancel
  • onNext, onError, onComplete

Usage patterns would include:

Scalar Request, Scalar Response

This would behave similarly to RPC/IPC calls.

For example:

  • UP subscribe("hello", 1) // to eliminate round-trip, the initial request(n) could be included in the subscribe
  • DOWN onNext("World!")
  • DOWN onComplete

Scalar Request, Vector Response

This would behave similarly to HTTP Server-Sent-Events.

For example:

  • UP subscribe("names", 100) // to eliminate round-trip, the initial request(n) could be included in the subscribe
  • DOWN onNext("Dave")
  • DOWN onNext("Tom")
  • DOWN onNext("Sarah")
  • DOWN onComplete

Or with request(n) and unsubscribe on an infinite stream:

  • UP subscribe("increment", 3) // to eliminate round-trip, the initial request(n) could be included in the subscribe
  • DOWN onNext(1)
  • DOWN onNext(2)
  • DOWN onNext(3)
  • UP request(2)
  • DOWN onNext(4)
  • DOWN onNext(5)
  • UP unsubscribe

Bidirectional Streams

This would behave more like raw TCP or WebSockets.

The following example is very poor, but representative of desire for messaging UP with event propagation DOWN across multiple subscriptions.

  • UP subscribe("user-events-XYZ", 100)
  • UP subscribe("data-updates", 100)
  • UP msg("eventA", "abc") // fire-and-forget a message
  • DOWN onNext("user-events-XYZ: eventA Completed")
  • UP msg("/do/something", "args")
  • DOWN onNext("user-events-XYZ: x-updated")
  • DOWN onNext("data-event: 8756-modified")

Possible Outcomes

Intended outcomes of this pursuit are:

  1. Discover there is already a solution for this and we can shut this down and use it.
  2. Decide we can't agree and we go off and build our own custom things.
  3. We determine this is a useful and newish thing, collaborate and build the above.

Artifacts

Following are artifacts envisioned from this collaboration during this first phase.

Network Protocol

This is expected as purely a network protocol. Due to my ignorance I can't specify more, but I expect variations for:

  • binary and text (for example, into JavaScript apps it may be valuable to support text/JSON whereas interprocess Java/C/Go/etc would benefit from binary)
  • unidirectional and bidirectional transport layers (TCP vs HTTP/1 vs WebSockets vs HTTP/2 etc as tranport layers)
  • how serialiation and protocol negotiation should work

Ultimately the desire is for protocols to be defined that can work on top of TCP, HTTP/1, HTTP/2, WebSockets and possibly others like UDP.

Java Interfaces and Reference Implementation

Java interfaces for exposing the various use cases using Reactive Streams interfaces would be very powerful to allow a standard interop for Reactive Stream IO.

It is not expected to have anything more than interfaces defined, but a reference implementation with unit tests to prove functionality should be included.

JavaScript Interfaces and Reference Implementation

Similar desire as for Java above.

Network TCK

Along with the network protocol I would expect a test suite to validate implementations.

Moving Forward

As a first step I'd like to determine if there is sufficient interest and that this is not insane, completely naive and wrong, or reinventing something that already exists.

If we get through that part, I'll work with you all to create more concrete Github issues to start designing and making this happen.

Java Language Bindings

Another aspect of Reactive Streams IO that interests me is having language specific APIs as bindings on top of the network protocol and using Reactive Streams Publisher to expose the streams.

Specifically I plan on defining RS interfaces in Java for client/server functionality of HTTP/1.1, SSE, WebSockets, HTTP/2, and TCP functionality as well as the RS.io network protocol we are now defining.

The intent would be for common RS.io interfaces to be used to implement the networking library (on top of various transport implementations such as Netty) and then allow various RS implementations like Akka, Rx, Reactor, etc to layer on top. The hope is for collaboration to enable solid, efficient, battle-tested reactive stream oriented networking APIs that can fit into our various opinionated apps.

screen shot 2015-02-26 at 11 55 33 pm

The reason I'm seeking for this is that we all seem to keep reinventing the wheel defining APIs for input/output on protocols that don't change, and I'm wondering if by putting our heads together we can do better and get a foundation we can work together on while allowing the exploration and opinions to happen a layer above? Our collective move to RS seems like a good chance to work together before we all go and build our different circumference wheels that all roll the same way.

Is this something that is too opinionated for RS, or would interfaces of this sort be useful across multiple libraries?

If it is of interest, should they be put in another repo such as reactive-streams-io-jvm or as a sub-module of reactive-streams-io?

UPDATE: This original posting was not very clear so I have clarified below: #5 (comment)

Security considerations and other Metadata

Had a thought around this and I wanted to get an issue for tracking considerations around stream security.

Given that this is a protocol, there needs to be allowances for security, such as token-based authentication, over the stream. WebSockets, which would likely be the choice for dealing with IO streams from browser-land, don't really have any great built-in mechanisms for security propagation. There is an HTTP-based handshake, but not much more.

What allowances, if any, are there for transporting metadata of any sort (including custom security), in a standardized way, with each reactive stream message? For example HTTP makes use of headers for this.

Perhaps this is a non-issue and is covered in another discussion.

Use Case Definitions

As per initial agreement in #1 to pursue further, this issue is intended to start collecting and defining the use cases.

RAET for reactive streams

The RAET (Reliable Asynchronous Event Transport) protocol seems like a good fit for reactive streams. Its currently only implemented in Python but the protocol is simple enough that porting to JavaScript and other languages should be relatively straightforward. It is Asynchronous Message Based. Uses LibSodium (NACL) for end to end encryption and signing. Message bodies can either be raw binary (whatever) or serialized with json or msg pack. The protocol is extensible and modular.

See

https://github.com/RaetProtocol/raet

https://github.com/RaetProtocol/raet/blob/master/docs/topics/tutorial.rst

It is used in production by SaltStack and Plenum

ZeroMQ/ZMTP/Malamute vs Reactive Streams

Anyone having experience with ZeroMQ can you provide insights into the work being done on Malamute (https://github.com/zeromq/malamute/blob/master/MALAMUTE.md)? Or perhaps ZMTP (http://zmtp.org)?

Malamute uses credit based flow control (CBFC) to manage the buffering of data from broker to client.

It seems to have overlapping goals and approaches so I'd like to have a good understanding of it both to learn and if we proceed with RS.io why we need a new protocol as opposed to using ZeroMQ.

Preliminary Work and IoT Use Case

Howdy, folks.

A little background on our use case:

I work on an open source IoT platform, zettajs/zetta. As you can imagine, streaming data is a pretty important concept when exposing sensor data. We use HTTP and WebSockets for the client-to-server (C2S) protocol, layering similar semantics over the multiplexed SPDY protocol for server-to-server (S2S) communication.

Currently, we use an API style called hypermedia for offering links to data streams. Some problems we're looking to solve:

  1. Multiplex data streams. Each data stream in the C2S protocol uses its own WebSocket connection today. We need to define a new protocol that allows multiple stream subscriptions over a single socket and optionally, over multiple sockets. Why is a new protocol important? In addition to defining the interaction, we need to define a scheme indicating connection/subscription info using only URIs (again, this jives with our hypermedia approach).
  2. We need to solve the producer-consumer problem. The pull-based semantics of Reactive Streams is exactly what we have been intending to model for quite some time now. I'm hoping to make time for this soon. Mobile clients are often cited as a good reason to do this, but that's not the only use case. Allowing consumers to have control offers a lot, including consumer-initiated stream sampling.

My intention has been to define this using a protocol that would work over plain TCP and map those semantics to work over WebSockets. For Zetta, this would be strictly for the C2S protocol at this time. The S2S protocol is a little more complex, involving WebSockets, SPDY, SPDY Server Push, and swapping client/server roles between open connections (I won't go deeper, but trust me, it makes sense). We'll handle the mapping at that level, but I'm not sure how much of that would be beneficial for the greater community.

So, my question: Has there been any preliminary work defining Reactive Streams interaction over TCP? If so, where might I find a document or discussion? As much as I enjoy making a 15th standard (and I truly do), I'd love to be able to combine efforts if it makes sense. I can share my thinking, as well, as soon as I gather my notes and draft a document.

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.