Giter Club home page Giter Club logo

Comments (47)

benjchristensen avatar benjchristensen commented on August 15, 2024

/cc @smaldini

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

I’m currently looking into trying out some interop with Akka Streams, are artifacts published somewhere?

from rxjavareactivestreams.

ldaley avatar ldaley commented on August 15, 2024

You’ll have to hold off. It’s not spec compliant yet. Once it is I’ll get some artefacts published.

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

Bummer; I guess I’ll look into ratpack and Reactor then (want to show something at conferences next week).

from rxjavareactivestreams.

ldaley avatar ldaley commented on August 15, 2024

I’m actively working on this right now. Should have it working in the next day or two.

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

Cool, would you please ping this issue when I should try it out? Thanks!

from rxjavareactivestreams.

ldaley avatar ldaley commented on August 15, 2024

What I've just pushed should be compliant and I think is good enough for an initial release. There's probably bugs and there are certainly performance problems, but I think it can go out.

@benjchristensen how do we get a binary out?

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

I'll push the buttons (since we haven't yet got the fully self-serve Travis based release process working yet like RxScala).

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

0.2.0 is released on BinTray (https://bintray.com/reactivex/RxJava/RxJavaReactiveStreams/0.2.0/view) and is making its way to Maven Central.

@alkemist Thank you for your work on this. Would you mind providing a section on the README or in the Wiki with basic usage examples?

from rxjavareactivestreams.

smaldini avatar smaldini commented on August 15, 2024

Looking forward to it for my next demos :D

On Wed, Oct 29, 2014 at 5:23 PM, Ben Christensen [email protected]
wrote:

0.2.0 is released on BinTray (
https://bintray.com/reactivex/RxJava/RxJavaReactiveStreams/0.2.0/view)
and is making its way to Maven Central.

@alkemist https://github.com/alkemist Thank you for your work on this.
Would you mind providing a section on the README or in the Wiki with basic
usage examples?


Reply to this email directly or view it on GitHub
#5 (comment)
.

Stéphane

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

It can be seen on Maven Central now: http://repo1.maven.org/maven2/io/reactivex/rxjava-reactive-streams/0.2.0/

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

Thanks a lot, @benjchristensen and @alkemist!

from rxjavareactivestreams.

ldaley avatar ldaley commented on August 15, 2024

Pushed one example of interop with Ratpack: https://github.com/ReactiveX/RxJavaReactiveStreams/blob/0.x/examples/ratpack/src/test/java/rx/reactivestreams/example/ratpack/RatpackExamples.java#L46-46

Leaving this ticket open for more examples and some stuff in the README.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

Cool, nice to see that code, that's very helpful.

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

@alkemist I’m trying to extract the sample code you linked to (since I cannot figure out how to run the tests you point to). The problem I am facing is that I cannot figure out how to publish ratpack 0.9.10-SNAPSHOT locally, what is the magic incantation? I tried adding the maven-publish plugin but that does not do anything when I say ./gradlew publishToMavenLocal (and ratpack-rx:publishToMavenLocal does not exist).

from rxjavareactivestreams.

smaldini avatar smaldini commented on August 15, 2024

@rkuhn can you use this repo: maven { url "http://oss.jfrog.org/repo" } ? can't remember for sure but I think this is where goes all the ratpack snapshots. BTW if you can share your example I want to complete it with Reactor as I keep pitching : Reactor for the backend access/data layer, Akka Streams to get into an Actor system and scale out, RxJava to bridge with some metrics and especially Hystrix right now, Ratpack to bridge with the HTTP client (WS/ESS) 💃

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

Got it working, thanks! You can find my sample project here. @benjchristensen this might be interesting for you as well.

from rxjavareactivestreams.

smaldini avatar smaldini commented on August 15, 2024

@rkuhn Having some hard time configuring the sample build into an IDEA project :(

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

Thanks @rkuhn ... that code looks like a good example for my talks next week as well. Do you mind if I use it (and possibly tweak/enhance it)?

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

@benjchristensen By all means: use it!

@smaldini That is the reason why I created an sbt project, I cannot figure out this gradle thing ;-) You should be able to just add a gradle build if you know better how that works (I don’t use IDEA).

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

@smaldini I added Reactor to my sample: Java and Scala. It works!

from rxjavareactivestreams.

smaldini avatar smaldini commented on August 15, 2024

Beautiful

from rxjavareactivestreams.

smaldini avatar smaldini commented on August 15, 2024

I am trying to convert the project to our own sample suite as I am having issues with both. Do you know where I can find 0.9 snapshots for Akka Streams, my build seems to complain about that.

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

Ah, yes: I am about to publish a suitable version, should not take more than an hour.

from rxjavareactivestreams.

smaldini avatar smaldini commented on August 15, 2024

I'll also propose we all use our own schedulers/dispatchers to make it more obvious that we talk each other handling async back pressure 👯

E.g. rxjava.observeOn(Schedulers.computation()), reactorStream.dispatchOn(new Environment())

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

Akka Streams 0.10-M1 is on its way to Maven Central; I’ll update my code as soon as it is there.

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

updated the build and it still works :-)

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

Using Akka Streams 0.10-M1 I was playing around and I've found an issue somewhere that the backpressure isn't propagating. Not sure yet where.

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;

import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;
import akka.actor.ActorSystem;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

public class RxAkka {

    public static void main(String... args) {
        final ActorSystem system = ActorSystem.create("InteropTest");
        final FlowMaterializer mat = FlowMaterializer.create(system);

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation());
                /* using Akka Streams */
                // convert to Reactive Streams Publisher
                Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);
                // convert to Akka Streams Source
                Source<String> stringSource = Source.from(groupPublisher).map(i -> i + " " + group.getKey());
                // convert back from Akka to Rx Observable
                return RxReactiveStreams.toObservable(stringSource.take(2000).runWith(Sink.<String> fanoutPublisher(1, 1), mat));

                /* using only Rx */
                //                return asyncGroup.take(2000).map(i -> i + " " + group.getKey());
            });

        strings.toBlocking().forEach(System.out::println);
        system.shutdown();
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

This non-deterministically blows up with:

Exception in thread "main" java.lang.RuntimeException: rx.exceptions.MissingBackpressureException
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:138)
    at reactive_streams_interop.RxAkka.main(RxAkka.java:45)
Caused by: rx.exceptions.MissingBackpressureException
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:222)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$1$3.onNext(OperatorGroupBy.java:236)
    at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:181)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:278)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:182)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
    at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
    at rx.Subscriber.setProducer(Subscriber.java:143)
    at rx.Subscriber.setProducer(Subscriber.java:137)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable.subscribe(Observable.java:7463)
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:98)
    ... 1 more
[ERROR] [11/01/2014 22:45:52.646] [InteropTest-akka.actor.default-dispatcher-16] [akka://InteropTest/user/$a/flow-2-1-map] failure during processing
rx.exceptions.MissingBackpressureException
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:222)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$1$3.onNext(OperatorGroupBy.java:236)
    at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:181)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:278)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:182)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
    at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
    at rx.Subscriber.setProducer(Subscriber.java:143)
    at rx.Subscriber.setProducer(Subscriber.java:137)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable.subscribe(Observable.java:7463)
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:98)
    at reactive_streams_interop.RxAkka.main(RxAkka.java:45)

This means the request flow isn't working.

It sometimes works however:

3998 true
4000 true
Number emitted from source (should be < 6000): 7055
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-fanoutPublisher] Message [akka.stream.impl.Cancel] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-fanoutPublisher#-718931434] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-2-take#-1094054996] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-2-take#-1094054996] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-fanoutPublisher] Message [akka.stream.impl.Cancel] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-fanoutPublisher#51622231] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

It works deterministically if I use just Rx without the conversion to/from. I have not yet spent time to hunt down where the issue is occurring.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

The above is done with:

compile 'io.reactivex:rxjava:1.0.0.rc.8'
compile 'io.reactivex:rxjava-reactive-streams:0.3.0'
compile 'com.typesafe.akka:akka-stream-experimental_2.11:0.10-M1'

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

Here is an example I'm considering using for a presentation on Tuesday. It's buggy right now (as shown above) but demonstrates the goals of interop while going through non-trivial operators (groupBy and flatMap) along with injected concurrency and thread-hopping.

rxjava-akka-streams-interop-example

Any recommendations on what to do differently that would be better? Can someone provide me a more realistic example of going from or to Akka Streams? I'd prefer to have something that is not so contrived if possible.

@smaldini I'll play more with yours next as shown in https://github.com/rkuhn/ReactiveStreamsInterop/blob/7124906fb50f9a91cee4e8d58c00853898eed239/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

This example with Reactor and RxJava seems to be working deterministically:

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;

import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class RxReactor {

    public static void main(String... args) {

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation());

                // convert to Reactive Streams Publisher
                Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);

                // Convert to Reactor Stream
                final Stream<String> linesStream = Streams.create(groupPublisher).map(i -> i + " " + group.getKey()).take(2000);

                // convert back from Reactor Stream to Rx Observable
                return RxReactiveStreams.toObservable(linesStream);
            });

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

Output ends with:

3987 false
3989 false
3991 false
3993 false
3995 false
3997 false
3999 false
Number emitted from source (should be < 6000): 4098
// sometimes with this (which is okay ... concurrency races) ...
Number emitted from source (should be < 6000): 6147

@smaldini Do you have a better example you'd like me to show?

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

rxjava-reactor-interop-example

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

@rkuhn Are you okay with me using the following slide? Is there anything you'd like me to change?

rxjava akka-streams

If you can provide me a more realistic example I'd happily change. For example an Akka Actor that acts as a source to RxJava for consumption.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

@smaldini I can't find a logo for Reactor, is there something I should use?

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

@alkemist Is there an example you'd like me to show in my presentation at QCon? Are you close to publishing to Maven Central a ratpack-rx version that supports RxJava 1.0? I tried with 0.9.9 but that isn't working (only 17 more days until 1.0 Final and no more breaking changes!).

from rxjavareactivestreams.

ldaley avatar ldaley commented on August 15, 2024

@benjchristensen 0.9.10 was released today. It's in Jcenter but a successful sync to Central is proving elusive. Regarding what to demo, I just pushed examples for SSE and WebSocket streaming using publishers. Chunks, SSE and WebSocket are really the only useful ways we leverage RS streams ATM.

Up to you what you want to show.

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

@benjchristensen The Akka part of the sample is fine, you can make it arbitrarily complex with all the operators out there.

The problem you are seeing very likely is that Akka streams buffer less elements, exposing the deadlock in your sample code: following a groupBy with a flatMap can only work under very specific circumstances, in general it is not a safe combination under bounded memory processing (since the even numbers will need to be buffered for as long as it takes to finish all the odd ones).

Concerning the exception itself: looking at the lines referenced in the stack trace it seems that data are just pushed too eagerly within the part leading up to observeOn, either by not respecting the signaled demand or by observeOn signaling more than it can take in. Do the RxJava or Reactor versions also work if you insert a Thread.sleep(1) within the map operation?

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

@alkemist Thanks, I'll try that.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

@rkuhn

very likely is that Akka streams buffer less elements

How much it buffers shouldn't matter, as request(n) will determine how much it requests. One impl should be able to request(1) and another request(Long.MAX_VALUE) and it work.

exposing the deadlock in your sample code: following a groupBy with a flatMap can only work under very specific circumstances

What do you mean by this? The groupBy and flatMap uses cases work and correctly slow down (i.e. apply backpressure) to whatever the slowest group is. It would only "dead lock" as you call it if you completely blocked one group, which is exactly what should happen in a backpressure case. The user is free to decouple fast and slow streams if they wish by using a multitude of different operators that will make it behave differently (such as sample, throttle, onBackpressureBuffer, onBackpressureDrop, debounce, buffer, window, etc).

it seems that data are just pushed too eagerly within the part leading up to observeOn

That's correct, hence the problem. It works with Rx+Reactor and with Rx by itself, which implies that the request(n) is not correctly being composed through when it is Rx+Akka. It may be the RxJavaReactiveStream implementation, but Rx+Reactor are working with the exact same use case.

I have not yet spent the time to debug where too much is being requested or the request is being lost.

I'll paste examples below using Thread.sleep to demonstrate that it works with RxJava by itself or with RxJava+Reactor.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

Example of RxJava + Reactor where the even group takes 10ms per item and the odd group takes 1ms per item

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;

import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class RxReactor {

    public static void main(String... args) {

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation()).map(i -> {
                    if (group.getKey()) {
                        // make 'even' group be slow
                        try {
                            Thread.sleep(10);
                        } catch (Exception e) {
                        }
                    } else {
                        // make 'odd' group be faster but still have some "computational cost"
                        try {
                            Thread.sleep(1);
                        } catch (Exception e) {
                        }
                    }
                    return i;
                });

                // convert to Reactive Streams Publisher
                Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);

                // Convert to Reactor Stream
                final Stream<String> linesStream = Streams.create(groupPublisher).map(i -> i + " " + group.getKey()).take(2000);

                // convert back from Reactor Stream to Rx Observable
                return RxReactiveStreams.toObservable(linesStream);
            });

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

You end up with output such as this:

1 false
3 false
5 false
7 false
9 false
11 false
13 false
15 false
17 false
2 true
19 false
21 false
23 false
...
3990 true
3992 true
3994 true
3996 true
3998 true
4000 true
Number emitted from source (should be < 6000): 4097

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

RxJava by itself doing groupBy + flatMap:

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class RxJavaGroupByFlatMap {

    public static void main(String... args) {

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                return group.observeOn(Schedulers.computation()).map(i -> {
                    if (group.getKey()) {
                        // make 'even' group be slow
                        try {
                            Thread.sleep(10);
                        } catch (Exception e) {
                        }
                    } else {
                        // make 'odd' group be faster but still have some "computational cost"
                        try {
                            Thread.sleep(1);
                        } catch (Exception e) {
                        }
                    }
                    return i;
                }).map(i -> i + " " + group.getKey()).take(2000);

            });

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

Here is one that multicasts a stream (use publish) creates 2 async/parallel streams off of it (odd and even using filtering) then zips them together with one of the streams slow.

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.schedulers.Schedulers;

public class RxJavaPublishZip {

    public static void main(String... args) {
        final AtomicInteger numEmitted = new AtomicInteger();

        Observable<Object> strings = Observable.range(1, 1000000).doOnNext(i -> numEmitted.incrementAndGet())
                .publish(oi -> {
                    // schedule it so we are async and need backpressure
                        Observable<String> odd = oi.observeOn(Schedulers.computation())
                                .filter(i -> i % 2 != 0).map(i -> i + "-odd").map(s -> {
                                    // make odd slow
                                        try {
                                            Thread.sleep(1);
                                        } catch (Exception e1) {
                                        }
                                        return s;
                                    });
                        Observable<String> even = oi.observeOn(Schedulers.computation())
                                .filter(i -> i % 2 == 0).map(i -> i + "-even");
                        return Observable.zip(odd, even, (o, e) -> o + " " + e + "   Thread: " + Thread.currentThread());
                    }).take(2000);

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be ~4000): " + numEmitted.get());
    }
}

Output would be like this:

1-odd 2-even   Thread: Thread[RxComputationThreadPool-3,5,main]
3-odd 4-even   Thread: Thread[RxComputationThreadPool-3,5,main]
5-odd 6-even   Thread: Thread[RxComputationThreadPool-3,5,main]
7-odd 8-even   Thread: Thread[RxComputationThreadPool-3,5,main]
9-odd 10-even   Thread: Thread[RxComputationThreadPool-3,5,main]
11-odd 12-even   Thread: Thread[RxComputationThreadPool-4,5,main]
13-odd 14-even   Thread: Thread[RxComputationThreadPool-3,5,main]
... 
3995-odd 3996-even   Thread: Thread[RxComputationThreadPool-3,5,main]
3997-odd 3998-even   Thread: Thread[RxComputationThreadPool-3,5,main]
3999-odd 4000-even   Thread: Thread[RxComputationThreadPool-3,5,main]
Number emitted from source (should be ~4000): 4864

You can see the backpressure happening because the num emitted is < 5000 in this case rather than spinning and emitting all 1000000. It is not exactly 4000 because we are async and allow buffers in observeOn to fill.

If I put take on the individual streams that it would be exact as it would propagate through, but via zip it can't do that as it splits streams, so each stream maintains its own backpressure and zip also maintains its own.

from rxjavareactivestreams.

ldaley avatar ldaley commented on August 15, 2024

@benjchristensen do you have in mind anything more than what we have now in terms of the general pattern? Where the proposed pattern is to have more projects in examples/ integrating different libraries/frameworks.

I'd like to encourage the respective library/framework owners to contribute examples. I don't intend to do it.

from rxjavareactivestreams.

ldaley avatar ldaley commented on August 15, 2024

Looking at the readme again, we probably do want at least one example of real interop there.

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

@alkemist The examples above are sufficient enough, but we should get them into /examples or something like that as you suggest, and probably have the simple ones presented in the README or wiki similar to how my slides give a quick intro: https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=158

from rxjavareactivestreams.

benjchristensen avatar benjchristensen commented on August 15, 2024

@rkuhn Any thoughts on the examples above? I'm interested in your perspective since you stated that groupBy and flatMap are not safe.

from rxjavareactivestreams.

rkuhn avatar rkuhn commented on August 15, 2024

Sorry for the delay, I’m traveling too much this month; I’ll try to elaborate later this week, otherwise we can talk it through next week at React.

from rxjavareactivestreams.

Related Issues (16)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.