davidmoten / rxjava2-extras Goto Github PK
View Code? Open in Web Editor NEWUtilities for use with RxJava 2
License: Apache License 2.0
Utilities for use with RxJava 2
License: Apache License 2.0
Hi.
I took on the challenge of writing a bridge that turns String
s in a Flowable
into bytes readable via the Java InputStream
API (blockingly):
Since I don't do I/O in RxJava2Extensions, I'd like to ask if this project would be interested in this type of bridge?
Hi,
I was trying to use RetryWhen.retryWhenInstanceOf(IOException.class)
but it would never trigger for classes that extend IOException
. Is that the intended behaviour?
By looking at the name I was assuming that it should trigger for any Object
that inherits whatever class I have passed.
commit version- d0315b6
Describe the bug
It is possible to encounter an unexpected crash due to a bad thread interleaving. The bad interleaving may occur due to sharing one variable (variable name is cancelled). For example, I saw at Line 53 and at Line 66 of FlowableRepeat.java, they share the same variable. Of these two accesses one method is writing on that variable at Line 66 and another Line at 53 reads the value of that variable. However, this may cause a crash anytime if multiple threads interleave in different ways.
Expected behavior
No error or sudden crash of the program is expected.
Additional context
When I investigated the FlowableRepeat class, I found that the cancelled variable is used on Line 53 and Line 66. Of these two access, one is a write operation. Hence, any unexpected crash or inconsistency may occur if multiple threads interleave.
Environment(please complete the following information):
I ran the test on an Ubuntu 20.04 LTS machine using OpenJDK 1.8.0_312.
One of the operators that was great in rxjava-extras was OrderedMerge.
Is this something that will be added to rxjava2-extras or is there a better way to do this now?
If this is to be added I could try to create a PR.
when I use this library in my gradle android project, the build fails:
:app:transformClassesWithNewClassShrinkerForDebug
com/github/davidmoten/rx2/internal/flowable/buffertofile/MemoryMappedFile references unknown class: sun/misc/Unsafe
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$2$1 references unknown class: com/esotericsoftware/kryo/Kryo
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$5 references unknown class: com/esotericsoftware/kryo/io/Input
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$5 references unknown class: com/esotericsoftware/kryo/Kryo
com/github/davidmoten/rx2/internal/flowable/buffertofile/UnsafeAccess references unknown class: sun/misc/Unsafe
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder references unknown class: com/esotericsoftware/kryo/io/Input
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder references unknown class: com/esotericsoftware/kryo/Kryo
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$4 references unknown class: com/esotericsoftware/kryo/io/Input
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$2$1 references unknown class: com/esotericsoftware/kryo/io/Output
com/github/davidmoten/rx2/flowable/Serialized references unknown class: com/esotericsoftware/kryo/Kryo
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$3 references unknown class: com/esotericsoftware/kryo/io/Input
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$3.call:()Lcom/esotericsoftware/kryo/io/Input; references unknown class member: com/esotericsoftware/kryo/io/Input.<init>:(Ljava/io/InputStream;I)V
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$1 references unknown class: com/esotericsoftware/kryo/io/Output
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$2 references unknown class: com/esotericsoftware/kryo/io/Output
com/github/davidmoten/rx2/flowable/Serialized.kryo:()Lcom/github/davidmoten/rx2/flowable/Serialized$KryoBuilder; references unknown class member: com/esotericsoftware/kryo/Kryo.<init>:()V
com/github/davidmoten/rx2/internal/flowable/buffertofile/MemoryMappedFile references unknown class: sun/nio/ch/FileChannelImpl
com/github/davidmoten/rx2/flowable/Serialized$KryoBuilder$1.call:()Lcom/esotericsoftware/kryo/io/Output; references unknown class member: com/esotericsoftware/kryo/io/Output.<init>:(Ljava/io/OutputStream;I)V
// java String.split() works as expected
assertEquals(1, "Hello World\n".split("\n").length);
TestSubscriber<String> subscriber = Strings
.from(new StringReader("Hello World\n"))
.compose(Strings.split("\n"))
.test();
subscriber.assertValueCount(1); // Fails with a result of '2'
subscriber.assertValues("Hello World"); // Fails with a result of ['Hello World', '']
Hello,
We tried running your project and discovered that it contains some flaky tests (i.e., tests that nondeterministically pass and fail). We found these tests to fail more frequently when running them on certain machines of ours.
To prevent others from running this project and its tests in machines that may result in flaky tests, we suggest adding information to the README.md file indicating the minimum resource configuration for running the tests of this project as to prevent observation of test flakiness.
If we run this project in a machine with 1cpu and 1gb ram, we observe flaky tests. We found that the tests in this project did not have any flaky tests when we ran it on machines with 2cpu and 2gb ram.
Here is a list of the tests we have identified and their likelihood of failure on a system with less than the recommended 2 CPUs and 2 GB RAM.
FROM maven:3.5.4-jdk-11
WORKDIR /home/
RUN git clone https://github.com/davidmoten/rxjava2-extras && \
cd rxjava2-extras && \
git checkout 3f35966fb5cd3b7a4d4ada38165de3f9fae7c6d6
WORKDIR /home/rxjava2-extras
RUN mvn install -DskipTests
ENTRYPOINT ["mvn", "test", "-fn"]
Build the image:
$> mkdir tmp
$> cp Dockerfile tmp
$> cd tmp
$> docker build -t rxjava2-extras . # estimated time of build 3m
Running:
this configuration likely prevents flakiness (no flakiness in 10 runs)
$> docker run --rm --memory=2g --cpus=4 --memory-swap=-1 rxjava2-extras | tee output.txt
$> grep "Failures:" output.txt # checking results
checking results
this other configuration –similar to the previous– can’t prevent flaky tests (observation in 10 runs)
$> docker run --rm --memory=1g --cpus=0.5 --memory-swap=-1 rxjava2-extras | tee output2.txt
$> grep "Failures:" output2.txt # checking results
Hey David,
Do you think this project should only support Flowable operators? Or will there be plans for Observable support as well?
Thomas N.
Hi.
I like rxjava2-extras and expect rxjava3-extras.
Do you have any plans?
Thanks.
As per doco on README.md I'd love to get new operators minRequest
and maxRequest
reviewed for correctness. Thanks!
Relevant classes:
The semantics would be "Retry when the count of consecutive errors has not exceeded a certain value." The error count would be reset upon a successful onNext
call.
It could be done with a builder function like .consecutiveErrorCountLessThan( int n )
. (If 2+ billion consecutive errors isn't enough, make the parameter a long
:-)
I've implemented something like this as a composable ObservableTransformer
. It also accepts an optional list of the types of Exception
to be counted. Caveat: it's not well tested yet.
public class RetryLimiter<T> implements ObservableTransformer<T,T> {
private final static String TAG = RetryLimiter.class.getSimpleName();
private int errorCount = 0;
private int maxErrorCount;
private List<Class> countedExceptions; // note errors not in this list (if defined) are not counted and will continue retries
// constructor passes max tolerated consecutive errors (count is rezeroed upon a successful emission)
public RetryLimiter( int maxErrors ) {
if( maxErrors < 0 ) throw new IllegalArgumentException( "maxErrors cannot be < 0" );
maxErrorCount = maxErrors;
}
// constructor that accepts a list of the types of exceptions to count (others are ignored)
public RetryLimiter( int maxErrors, List<Class> exceptions ) {
this( maxErrors );
if( exceptions != null ) {
for( Class c : exceptions ) { // check that the supplied list contains only Throwable (sub)classes
if( !Throwable.class.isAssignableFrom( c ) ) { // is c a Throwable or subclass of Throwable?
throw new IllegalArgumentException( "List can only contain class Throwable or its subclasses" );
}
}
countedExceptions = exceptions;
} else {
throw new NullPointerException( "If supplied, Exception Class list cannot be null" );
}
}
@Override
public ObservableSource<T> apply( Observable<T> upstream ) {
return upstream
.doOnError( err -> {
if( countedExceptions != null ) {
for( Class t : countedExceptions ) {
if( err.getClass().equals( t ) ) {
errorCount++;
break; // don't count more than once
}
}
} else { // null list of counted Exceptions
errorCount++;
}
if( DEBUG ) Log.d( TAG, "Consecutive error #" + errorCount + ": " + err.toString() );
} )
.doOnNext( next -> errorCount = 0 )
.retryUntil( () -> errorCount > maxErrorCount );
}
}
Also, what about including an option for a general Predicate
function to decide whether or not to retry? I realize this is essentially the same as the standard .retryUntil( Predicate )
but it could be a useful addition: .isTrue( Predicate p )
. Not to gild the lily, but it could perhaps be two functions: .and( Predicate p )
and .or( Predicate p )
. Arguments to the Predicate
are TBD but would presumably include the Throwable
.
RetryWhen.retryWhenInstanceOf has vararg parameter with generic type, this produces Java warning in code that is using this method. https://docs.oracle.com/javase/tutorial/java/generics/nonReifiableVarargsType.html#vulnerabilities.
It should be enough to add @SafeVarargs to RetryWhen.retryWhenInstanceOf method
Also possible workaround is to use retryIf(e -> e instanceOf X) instead
Changes in rxjava 2.0.6 and 2.0.7 are causing rxjava2-extras test failures:
2.0.6:
testDoesNotTwoErrorsIfUpstreamDoesNotHonourCancellationImmediately(com.github.davidmoten.rx2.internal.flowable.FlowableCollectWhileTest)
2.0.7
testDoesNotTwoErrorsIfUpstreamDoesNotHonourCancellationImmediately(com.github.davidmoten.rx2.internal.flowable.FlowableCollectWhileTest)
testRebatchRequestsMinEqualsMaxDontConstrainFirstRequest(com.github.davidmoten.rx2.internal.flowable.FlowableMinRequestTest): expected:<[5, 5]> but was:<[5, 5, 5]>
testConstrainedFirstRequest(com.github.davidmoten.rx2.internal.flowable.FlowableMinRequestTest): expected:<[2, 2, 2, 2, 2]> but was:<[2, 2, 2, 2, 2, 2]>
testUnsubscribe(com.github.davidmoten.rx2.internal.flowable.FlowableMatchTest)
Reflection blacklist Lsun/nio/ch/FileChannelImpl;->map0 use(s):
Lcom/github/davidmoten/rx2/internal/flowable/buffertofile/MemoryMappedFile;->()V
as follow up to #7, add some goodness to the README for these operators, cc @thomasnield
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.