Comments (10)
Why don't you use the available operators to switch between modes?
datasource
.to(ParallelFlowable::from)
.runOn(Schedulers.computation())
.doOnNext(v -> { })
.sequential()
.observeOn(Schedulers.single())
.buffer(1000)
.doOnNext(list -> { })
.subscribe();
from rxjavaextensions.
Wonderful !
Very impressive power !
I forgot to say that the first computation is IO intensive. It's better to use another type of scheduler ?
(I'm still studying rxjava.)
from rxjavaextensions.
As you wish, the parallelism level is determined by the from
parameter and not the Scheduler
:
.to(f -> ParallelFlowable.from(f, 3))
.runOn(Schedulers.io())
from rxjavaextensions.
... and in this way the first doOnNext lambda is not executed when the second one is ?
from rxjavaextensions.
Stages will run in parallel, why do you have that requirement? RxJava is about executing in a flow and not blocking out things; backpressure is there to limit memory usage if that's the problem.
from rxjavaextensions.
I need to alternate the two stages.
Stage one is read only
Stage two is when results are saved
I need to isolate them, the application is much more simple and robust this way.
What I need is something like a blocking-output-buffer for the doOnNext operator. When it's full it blocks the doOnNext until the buffer is consumed
from rxjavaextensions.
The individual items should be isolated from each other anyway.
What I need is something like a blocking-output-buffer for the doOnNext operator. When it's full it blocks the doOnNext until the buffer is consumed
This makes no sense in reactive.
Here is an alternative that works on a chunk of 1000 elements:
datasource
.window(1000)
.flatMap(w ->
ParallelFlowable.from(w)
.runOn(Schedulers.io())
.doOnNext(item -> { })
.sequential()
.toList()
.doOnNext(list -> { })
, 1)
.subscribe();
Here processing the items parallel happens before collecting them into a list and once that happens, you can process the list and there won't be any parallel execution from the first doOnNext
nor there won't be another window started before you have finished with the list.
from rxjavaextensions.
OK that is a solution.
So it's not possible without buffering at start.
What's wrong with an operator that in loop do the following ?
do something with item,
collect item
emit items when the buffer is full
from rxjavaextensions.
What's wrong with an operator that in loop do the following ?
That's the imperative thinking about processing data. In reactive, there is no explicit loop but flow of data, aggregation, element processing, etc.
from rxjavaextensions.
ok... I'm still thinking in imperative mode.
from rxjavaextensions.
Related Issues (20)
- RxJavaAssemblyException Doesn't Show Stack Trace HOT 2
- Add ability to disable RxJavaAssemblyTracking in specific observable HOT 9
- Remove deprecated extensions HOT 1
- Dependabot couldn't authenticate with https://repo.spring.io/libs-snapshot
- Union of multiple observables HOT 1
- Is it feasible to create a Java Agent similar to Reactor's Debug Agent? HOT 2
- FlowableOrderedMerge for ParallelFlowables HOT 4
- expand() delays errors when using DEPTH_FIRST HOT 2
- Proposal: Executor - Scheduler interop HOT 8
- Proposal: Add support for conditionals in Single, Maybe and Completable HOT 1
- SingleConsumers.subscribeAutoDispose does not always remove the Disposable from the CompositeDisposable HOT 2
- RxJavaAssemblyTracking impacts execution and produce different results HOT 2
- partialCollect() "locks" when downstream demand goes down to zero and items are already consumed from upstream HOT 5
- Seperate out RxJavaAssemblyTracking HOT 1
- Specify exact buffer size to Unicast-/DispatchWorkSubject HOT 6
- Flowables.orderedMerge NoSuchMethodError HOT 2
- Add indexed transform operator to ParallelFlowable
- [3.x] RxJavaAssemblyTracking stack trace is not printed HOT 6
- Dependabot couldn't authenticate with http://repo.spring.io/libs-snapshot
- Add switchNextIfEmpty() HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rxjavaextensions.