Giter Club home page Giter Club logo

rxjavaextensions's People

Contributors

akaita avatar akarnokd avatar cerisier avatar dalinaum avatar dano avatar dependabot-preview[bot] avatar dependabot[bot] avatar leventov avatar martinnowak avatar piomar123 avatar swimmesberger avatar technoir42 avatar tomdotbradshaw avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxjavaextensions's Issues

Seperate out RxJavaAssemblyTracking

I'd like to include RxJavaAssemblyTracking in a project, but don't need the other stuff. would it be possible for RxJavaExtensions to be its own lib?

RxJavaAssemblyException Doesn't Show Stack Trace

I noticed some differences in between RxJavaAssemblyException and RxJava1's AssemblyStackTraceException, namely that the the current implementation doesn't pass the stacktrace as a message to super here.

This makes it somewhat difficult to track call sites of streams (without having a debugger attached all the time, or adding some custom logger implementation to specifically log these types of exceptionsusing the find method)

Is this by design? Maybe for clarity when dealing with many chained exceptions?

You also mentioned in the readme that the format is up for discussion - does this mean we'll see support for this in the main RxJava2 repo at some point?

Happy to submit a PR if there's any work needed here (maybe some flags to set what level of logging a user would like e.g. SHOW_ALL_STACKTRACES, SHOW_LAST_STACKTRACE, HIDE_ALL_STACKTRACES).

Buffered alternate parallel and non-parallel executions

Is it RxJava2Extensions suitable for the following scenario ?

I have a stream of data coming from a DB query.
I need to process data in two step, the first step in parallel and the second one in non-parallel mode. Step1 and Step2 are exclusive.

Requirements:

  • start step1 immediately as data arrives without buffering
  • before pass data to step2 collect it in a buffer
  • during step2 (buffer after buffer) computation, step1 must be blocked

Pseudo code:

DATA.forEach.do{

parallelExecute { compute_step1(dataItem) }

buffer(1000)

nonParallelExecute { compute_step2(dataItem) }

outputResults // buffer after buffer
}

Proposal: Executor - Scheduler interop

Right now RxJava has support for creating a Scheduler from an Executor, but not the other way around. This could be useful for allowing shared use of a scheduler's thread pool for APIs that don't necessarily speak RxJava.

Is it feasible to create a Java Agent similar to Reactor's Debug Agent?

I discovered that Project Reactor provides a Java Agent that does bytecode manipulation to do its equivalent of RxJava's assembly tracking (called "debug mode"), but without any runtime performance penalty. Naturally, this made me curious if RxJava could use a similar technique to get the benefit assembly tracking without high cost of generating stack traces. Does it seem possible, or are there differences in the underlying implementations of each that would make it infeasible/impossible for RxJava?

Specify exact buffer size to Unicast-/DispatchWorkSubject

Using UnicastWorkSubject, DispatchWorkSubject or DispatchWorkProcessor with create(Int) function we can specify "hint" capacity size. Buf if I want to buffer only the last item and receive them in onSubscribe() then create(1) doesn't work as intendent - I still receive many of cached events. Is there any other way to do that?

Operator ideas (from Stackoverflow)

  • cacheLast (already in reactor-core) - cache the very last value, like AsyncProcessor
  • timeoutLast - emit the last received value if the timeout passed or the upstream completes.
  • switchFlatMap - like switchMap but keep a fixed pool of active sources and cancel the oldest if a new source arrives
  • intervalBackpressure - like interval but without the MBE if the downstream can't keep up, a late request then gets all the missed signals immediately.
  • debounceFirst - similar to debounce but emits the first item of the window and then keeps extending the window if more upstream items arrive. The 'gate' is then only reopened if the specified delay elapses after the last emission.

Union of multiple observables

Is there an operator or an easy way to generate a union of elements from different observables? Meaning that only elements that exists in all observales should be pushed downstream. Would be also great if there could be an option to define the comparator.

Meaning, I would expect:

A = [1, 2, 4, 5]
B = [2, 5, 7]
C = [0, 1, 2, 5, 7]

UNION (A, B, C) = [2, 5]

Maybe something like this exists in the RxJava itself?

expand() delays errors when using DEPTH_FIRST

I'm wondering if the following behavior is intended or not:

Flowable.just(10, 5)
        .compose(FlowableTransformers.expand(v -> {
            if (v == 9)
                return Flowable.error(new Throwable("error"));
            else if (v == 0)
                return Flowable.empty();
            else
                return Flowable.just(v - 1);
        }, ExpandStrategy.DEPTH_FIRST))
        .test()
        .assertErrorMessage("error")
        .assertValues(10, 9); // fails! actual: 10, 9, 5, 4, 3, 2, 1, 0

[3.x] RxJavaAssemblyTracking stack trace is not printed

My test code:

package me.piomar.dataflow;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import hu.akarnokd.rxjava3.debug.RxJavaAssemblyTracking;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;

public class RxErrorApp {

    private static final Logger LOGGER = LoggerFactory.getLogger(RxErrorApp.class);

    public static void main(String[] args) {
        LOGGER.info("App start");
        RxJavaAssemblyTracking.enable();

        List<String> list = Flowable.just("a", "b", "c")
                                    .observeOn(Schedulers.computation())
                                    .map(RxErrorApp::bang)
                                    .toList()
                                    .blockingGet();
        LOGGER.info("Result: {}", list);
    }

    private static String bang(String input) {
        throw new IllegalStateException("bang");
    }
}

The result in console:

Exception in thread "main" java.lang.IllegalStateException: bang
	at me.piomar.dataflow.RxErrorApp.bang(RxErrorApp.java:29)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:64)
	at hu.akarnokd.rxjava3.debug.FlowableOnAssembly$OnAssemblySubscriber.onNext(FlowableOnAssembly.java:61)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runSync(FlowableObserveOn.java:337)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:174)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:65)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:56)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: hu.akarnokd.rxjava3.debug.RxJavaAssemblyException
Caused by: hu.akarnokd.rxjava3.debug.RxJavaAssemblyException

As it can be seen, there is no stack trace printed inside RxJavaAssemblyException. I'm using OpenJDK11, compiling to Java 8 (tested on JDK8 with same result). During debug, it can be seen that there is String RxJavaAssemblyException.stacktrace field which has been filled properly but seems it's not used anywhere.

Maven dependencies:

        <dependency>
            <groupId>com.github.akarnokd</groupId>
            <artifactId>rxjava3-extensions</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.0.5</version>
        </dependency>

Flowables.orderedMerge NoSuchMethodError

Stackoverflow

Hi I am a scala developer and tried to use this extension to do the orderedMerge,
however I failed,
In Ammonite, I execute the following codes... Trying to do an orderedMerge of two Flowables,

import $ivy.{
    `io.circe::circe-generic:0.13.0`                    ,
    `io.circe::circe-parser:0.13.0`                     ,
    `io.circe::circe-optics:0.13.0`                     ,
    `com.softwaremill.sttp::core:1.7.2`                 ,
    `org.scalaz::scalaz-core:7.2.27`                    ,
    `com.lihaoyi::requests:0.2.0`                       ,
    `io.get-coursier::coursier-core:2.0.0-RC4`          ,
    `io.lemonlabs::scala-uri:1.4.10`                    ,
    `net.liftweb::lift-json:3.4.0`                      ,
    `io.reactivex.rxjava3:rxjava:3.0.3`                 ,
    `com.github.akarnokd:rxjava3-extensions:3.0.0-RC7`
}

import io.lemonlabs.uri._
implicit val formats = net.liftweb.json.DefaultFormats
import net.liftweb.json.JsonAST._
import net.liftweb.json.Extraction._
import net.liftweb.json._
import io.reactivex.rxjava3.core._
import io.reactivex.rxjava3.functions._
import net.liftweb.json.JsonDSL._
import collection.JavaConverters._
import io.reactivex.rxjava3.subjects._
import scala.collection.mutable.HashMap
import io.reactivex.rxjava3.internal.functions.Functions
Flowables.orderedMerge(
    List(
        Flowable.just(3, 5),
        Flowable.just(2, 4, 6)
    ).toIterable.asJava, 
    Functions.naturalComparator[Int]()
    ).subscribe(i=>println(i))

But after printing 2, it throw exception:

java.lang.NoSuchMethodError: io.reactivex.rxjava3.internal.subscribers.InnerQueuedSubscriber.requestOne()V
  hu.akarnokd.rxjava3.operators.BasicMergeSubscription.drain(BasicMergeSubscription.java:268)
  hu.akarnokd.rxjava3.operators.BasicMergeSubscription.innerComplete(BasicMergeSubscription.java:161)
  io.reactivex.rxjava3.internal.subscribers.InnerQueuedSubscriber.onSubscribe(InnerQueuedSubscriber.java:69)
  io.reactivex.rxjava3.internal.operators.flowable.FlowableFromArray.subscribeActual(FlowableFromArray.java:39)
  io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15750)
  io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15696)
  hu.akarnokd.rxjava3.operators.BasicMergeSubscription.subscribe(BasicMergeSubscription.java:79)
  hu.akarnokd.rxjava3.operators.FlowableOrderedMerge.subscribeActual(FlowableOrderedMerge.java:94)
  io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15750)
  io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15686)
  io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15618)
  ammonite.$sess.cmd63$.<clinit>(cmd63.sc:1)

The weird part is, it could print the first ordered item, but not the following ones...

SingleConsumers.subscribeAutoDispose does not always remove the Disposable from the CompositeDisposable

I'm not sure if I'm actually just misunderstanding how subscribeAutoDispose is supposed to work or if this is a bug - basically, if I use SingleConsumers.subscribeAutoDispose, and the Single I pass in completes via onSuccess, it does not get removed from the CompositeDisposable. It seems like this is because DisposableAutoReleaseMultiObserver.onSuccess does not call removeSelf(). I think MaybeConsumers.subscribeAutoDispose has the same behavior. But again, I'm not sure if this is a real bug or I'm just misunderstanding something.

Operator mapFilter

Implement a hybrid mapFilter operator:

mapFilter(BiConsumer<T, MapFilterEmitter<R>> consumer);

interface MapFilterEmitter<R> {
    void onNext(R value);
    void onError(Throwble ex);
    void onComplete();
}

I'm not sure yet if an explicit onDrop would be required or it it enough that a lack of onXXX call means ignore the source value.

Add indexed transform operator to ParallelFlowable

Signature:

ParallelTransformer<T, R> transformRail(BiFunction<Flowable<T>, Integer, Publisher<R> mapper)

Given the parallelism of the ParallelFlowable, the function will be called for each rail indexed 0..n-1 to provide a mapping for that particular rail.

Considerations:

  • The input Flowables will be single-subscriber only.
  • The implementation should not buffer but has to be careful when relaying signals because:
    • the returned Publisher may not be connected to the input Flowable for a time or at all.
    • the input Flowable may terminate before the downstream gets to subscribing to it.

See also https://github.com/akarnokd/RxJavaMicroprofileRS/blob/master/src/main/java/hu/akarnokd/rxjava3/mprs/DeferredProcessor.java

Add a Valve operator

Similar to the operator you created for Rx 1: https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

Most reactive extensions for other languages can achieve something similar using various operators in combination but I'm struggling to recreate it in RxJava, I'm also struggling to convert the above operator into Rx 2.x to use it personally.

The goal is to be able to trigger the valve to open or close using a boolean value, when open the stream should flow as normal, when closed the items are buffered until the valve is opened again, where all buffered items are then emitted. I can see this being quite useful for a number of reasons so I was hoping it could become officially supported.

partialCollect() "locks" when downstream demand goes down to zero and items are already consumed from upstream

There seem to be a corner case that "locks" the partialCollect() operator. I am not sure whether it's the implementation or the example implementation provided in the documentation that are wrong.

Here's the example from the documentation page slightly modified that triggers the behaviour:

     Flowable.just("ab|cdef", "gh|ijkl|", "mno||pqr|s", "|", "tuv|xy", "|z")
            .compose(FlowableTransformers.partialCollect(
              new Consumer<PartialCollectEmitter<String, Integer, StringBuilder, String>>()
              {
                @Override
                public void accept(
                  PartialCollectEmitter<String, Integer, StringBuilder, String> emitter)
                  throws Exception
                {
                  Integer idx = emitter.getIndex();
                  if (idx == null) {
                    idx = 0;
                  }
                  StringBuilder sb = emitter.getAccumulator();
                  if (sb == null) {
                    sb = new StringBuilder();
                    emitter.setAccumulator(sb);
                  }

                  if (emitter.demand() != 0) {

                    boolean d = emitter.isComplete();
                    if (emitter.size() != 0) {
                      String str = emitter.getItem(0);

                      int j = str.indexOf('|', idx);

                      if (j >= 0) {
                        sb.append(str.substring(idx, j));
                        emitter.next(sb.toString());
                        sb.setLength(0);
                        idx = j + 1;
                      } else {
                        sb.append(str.substring(idx));
                        emitter.dropItems(1);
                        idx = 0;
                      }
                    } else if (d) {
                      if (sb.length() != 0) {
                        emitter.next(sb.toString());
                      }
                      emitter.complete();
                      return;
                    }
                  }

                  emitter.setIndex(idx);
                }
              }, Functions.emptyConsumer(), 128))
            //MODIFICATION STARTS HERE
            .doOnRequest(l -> System.out.println("request(" + l + ")"))
            .doOnNext(s -> System.out.println("Item: " + s))
            .observeOn(Schedulers.io(), false, 1)
            .test()
            .awaitDone(10, TimeUnit.SECONDS)
            .assertResult(
              "ab",
              "cdefgh",
              "ijkl",
              "mno",
              "",
              "pqr",
              "s",
              "tuv",
              "xy",
              "z"
            );

The output of the above test is:

request(1)
Item: ab
request(1)
Exception in thread "main" java.lang.AssertionError: Value count differs; expected: 10 [ab, cdefgh, ijkl, mno, , pqr, s, tuv, xy, z] but was: 1 [ab] (latch = 1, values = 1, errors = 0, completions = 0, timeout!, disposed!)

The documentation of the operator states:

The operator will call the handler again if it detects an item has been produced, therefore, there is no need to exhaustively process all source items on one call (which may not be possible if only partial data is available).

But if you don't produce any item, then the handler will not be called again, and in the scenario above the operator stays "locked", so it seems that there IS a need to exhaustively process all source items on one call.

RxJavaAssemblyTracking impacts execution and produce different results

Recently I came across weird bug which occurred only in debug version of the app that has assembly tracking enabled.
It occurs when combination of replay(1).refcount() is used, resulting in a weird bug where first subscription after dispose is not emitting anything while second works as expected.

I was able to isolate the problem and narrow it down to simple example in Java:

public class Showcase {
    public static void main(String[] args) throws InterruptedException {
        RxJavaAssemblyTracking.enable();

        System.out.println("start");

        PublishSubject<String> stringsEmitter = PublishSubject.create();

        Observable<String> combineSource = stringsEmitter
                .replay(1)
                .refCount();

        CompositeDisposable c = new CompositeDisposable();

        c.add(
                combineSource
                        .subscribeOn(Schedulers.io())
                        .subscribe((string) -> System.out.println("A1:" + string))
        );
        c.add(
                combineSource
                        .subscribeOn(Schedulers.io())
                        .subscribe((string) -> System.out.println("B1:" + string))
        );

        stringsEmitter.onNext("s1");

        Thread.sleep(100);
        c.clear();

        combineSource.subscribeOn(Schedulers.io())
                .subscribe((string) -> System.out.println("A2:" + string));

        combineSource.subscribeOn(Schedulers.io())
                .subscribe((string) -> System.out.println("B2:" + string));

        Thread.sleep(100);

        stringsEmitter.onNext("s2");

        Thread.sleep(1000);

        stringsEmitter.onNext("s3");
        stringsEmitter.onNext("s4");
    }
}

Output i get when assembly tracker is disabled - as expected:

start
A2:s2
B2:s2
A2:s3
B2:s3
A2:s4
B2:s4

Output when assembly tracker is enabled:

start
B2:s2
B2:s3
B2:s4

RxJava version 2.2.7
RxJava2Extensions 0.20.6

FlowableOrderedMerge for ParallelFlowables

It would be nice to have a orderedMerge method which can be used with ParallelFlowables if I'm correct the FlowableOrderedMerge could be easily adjusted to provide such functionality by simply changing the constructor from:

    FlowableOrderedMerge(Publisher<T>[] sources, Iterable<? extends Publisher<T>> sourcesIterable,
            Comparator<? super T> comparator,
            boolean delayErrors, int prefetch) {

to

    FlowableOrderedMerge(ParallelFlowable<T> source,
            Comparator<? super T> comparator,
            boolean delayErrors, int prefetch) {

and changing the subscribeActual method to:

    @Override
    protected void subscribeActual(Subscriber<? super T> s) {
        final MergeCoordinator<T> parent = new MergeCoordinator<T>(s, comparator, this.source.parallelism(), prefetch, delayErrors);
        s.onSubscribe(parent);
        this.source.subscribe(parent.subscribers);
    }

I don't know how you want to integrate the change to support both cases but it would be very useful to have :)

Add switchNextIfEmpty()

Add the following operators:

  • Flowables.switchNextIfEmptyArray(Publisher<T>... sources)
  • Flowables.switchNextIfEmpty(Iterable<Publisher<T>> sources)
  • Flowables.switchNextIfEmpty(Publisher<Publisher<T>> sources)
  • FlowableTransformers.switchNextIfEmptyArray(Publisher<T>... sources)
  • FlowableTransformers.switchNextIfEmpty(Iterable<Publisher<T>> sources)
  • FlowableTransformers.switchNextIfEmpty(Publisher<Publisher<T>> sources)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.