pivovarit / parallel-collectors Goto Github PK
View Code? Open in Web Editor NEWParallel Collectors is a toolkit easing parallel collection processing in Java using Stream API.
License: Apache License 2.0
Parallel Collectors is a toolkit easing parallel collection processing in Java using Stream API.
License: Apache License 2.0
toCollectionInParallel(Supplier<Collection> collection, Executor executor)
toListInParallel
To parallelize some side-effects I wrote following code which works as expected on my local machine:
public static void main( String[ ] args ) {
var executor = Executors.newFixedThreadPool(10 );
try {
var list = List.of( "A", "B" );
list.stream( )
.collect( ParallelCollectors.parallel( TestCase::processRecord, executor) )
.join( );
} finally {
executor.shutdown( );
}
}
private static String processRecord( String record ) {
System.out.println( Thread.currentThread( ).getName( ) );
return record;
}
$ docker run --rm testcase
pool-1-thread-2
pool-1-thread-1
However it took me a while to figure out why it doesn't when deployed on our cluster (Mesos & K8S). Eventually, I figured out that since release 2.3.0, parallel-collectors has an inconstent behavior when parallel=1 and parallel>1.
$ docker run --rm -cpuset-cpus=0 testcase
$ docker run --rm -cpuset-cpus=0,1 testcase
$ docker run --rm -cpuset-cpus=0,1,2 testcase
pool-1-thread-2
pool-1-thread-1
When parallel=1, mapping function is only invoked when the stream returned by parallel().join()
is consumed by a terminal operation (some like count
won't as the mapping function can be optimized away).
public static void main( String[ ] args ) {
var executor = Executors.newFixedThreadPool( 10 );
try {
var list = List.of( "A", "B" );
System.out.println( "count" );
list.stream( )
.collect( ParallelCollectors.parallel( TestCase::processRecord, executor ) )
.join( )
.count( );
System.out.println( "toList" );
list.stream( )
.collect( ParallelCollectors.parallel( TestCase::processRecord, executor ) )
.join( )
.collect( Collectors.toList( ) );
} finally {
executor.shutdown( );
}
}
$ docker run --cpuset-cpus=0 --rm testcase
count
toList
main
main
When parallel>1, mapping is always performed in provider executor in an eager way (no need to consume the stream).
This seems a bug as Javadoc says that parallel computation will be performed on a custom executor which is not true in this case. It is also dangerous as it is seems too easy to write the same bogus code than me.
I would propose to revert to 2.2.0 behaviour.
parallelToStream(Executor executor)
parallelToStream(Executor executor, int parallelism)
parallelToStreamOrdered(Executor executor)
parallelToStreamOrdered(Executor executor, int parallelism)
In case an exception gets thrown, collectors should immediately terminate operations and return a future completed exceptionally.
@Test
void shouldCollectToCollectionAndShortCircuitOnException() {
// given
executor = threadPoolExecutor(1);
LongAdder counter = new LongAdder();
try {
IntStream.generate(() -> 42).boxed().limit(10)
.map(i -> supplier(() -> {
counter.increment();
throw new IllegalArgumentException();
}))
.collect(parallelToList(executor, 1))
.join();
} catch (Exception e) {
}
assertThat(counter.longValue()).isOne();
}
public static <T, R> Collector<T, List<CompletableFuture<R>>, CompletableFuture<List<R>>> toListInParallel(Function<T, R> mapper, Executor executor)
So that users are not forced to implement an intermediate operation of creating Supplier instances.
Describe the bug
Upon upgrading parallel-collectors
from version 2.5.0 to 2.6.0, we've observed an increase in thread consumption.
We are using a org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
with the following parameters: corePoolSize
set to 1500, maxPoolSize
set to 3000, and queueCapacity
set to 100.
While on version 2.5.0, we never observed corePoolSize
exceeding 1500.
However, upon upgrading to version 2.6.0, corePoolSize
increases from 1500 a few seconds after application startup. No other changes were made.
To Reproduce
I've prepared a unit test that functions correctly in the 2.5.0
tag but fails in the 2.6.0
tag and master
branch:
@Test
void shouldExecuteParallelStreamsSequentially() {
int tasks = 2;
ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(tasks,
tasks,
0L,
MILLISECONDS,
new LinkedBlockingQueue<>(tasks));
Stream.of(210, 200, 160, 180)
.collect(parallelToStream(i -> returnWithDelay(i, ofMillis(i)), myExecutor, tasks))
.collect(toList());
// 2.6.0 and master fail from here
Stream.of(210, 200, 160, 180)
.collect(parallelToStream(i -> returnWithDelay(i, ofMillis(i)), myExecutor, tasks))
.collect(toList());
}
Expected behavior
I expect the unit test to pass in master
branch and 2.6.0
tag.
Thank you in advance.
Hi,
Its a very nice project, and I want to use it.
I have a question,
can I use filtering and flatMap functionality that is available in streams in Parallel-collectors?
Let's add a method returning a simple collector that collects a stream of futures into a single future.
public static void main(String[] args) {
CompletableFuture<List<Integer>> collect = Stream.of(1, 2, 3)
.map(i -> CompletableFuture.supplyAsync(() -> i))
.collect(toFuture());
}
public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> toFuture() {
return Collectors.collectingAndThen(Collectors.toList(), list -> {
CompletableFuture<List<T>> future = CompletableFuture
.allOf(list.toArray(new CompletableFuture[0]))
.thenApply(__ -> list.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
for (CompletableFuture<T> f : list) {
f.whenComplete((integer, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
}
});
}
return future;
});
}
toListInParallel(Executor executor, int parallelism)
toSetInParallel(Executor executor, int parallelism)
toCollectionInParallel(Supplier<Collection<T>> collection, Executor executor, int parallelism)
<T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Collector<R, ?, RR> c, Function<T, R> m, Executor e, int parallelism)
<T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> m, Executor e, int parallelism)
A signature like this would allow reducing API surface by getting rid of *toList/toMap/toSet
factories and replacing them with one API that allows users to pass their own Java Collectors.
So, instead of:
.collect(parallelToSet(i -> i + 2, executor, 2));
.collect(parallelToList(i -> i + 2, executor, 7));
One would expect:
CF<Set<Integer>> r = ...collect(parallel(toSet(), i -> i + 2, executor, 2));
CF<List<Integer>> r = ...collect(parallel(toList(), i -> i + 2, executor, 2));
Collector>
parameter could be omitted and would result in a CF containing unprocessed Stream
instances:
CF<Stream<Integer>> r = ...collect(parallel(i -> i + 2, executor, 2));
CF<Stream<Integer>> r = ...collect(parallel(i -> i + 2, executor, 2));
The difference for users is subtle but would allow to greatly simplify the implementation.
Blocking ParallelCollectors should not return CompletableFutures
but results directly:
Set<String> result = Stream.of(1, 2, 3)
.collect(inParallelToSet(i -> foo(), executor));
instead of:
CompletableFuture<Set<String>> result = Stream.of(1, 2, 3)
.collect(inParallelToSet(i -> foo(), executor));
If someone ends up providing parallelism = 1
, the whole implementation can be simplified drastically.
public class ParallelDispatcher<T> {
final ExecutorService dispatcher = newSingleThreadExecutor(new CustomThreadFactory());
final Executor executor;
final Queue<Supplier<T>> workingQueue;
final Queue<CompletableFuture<T>> pendingQueue;
public ParallelDispatcher(Executor executor, Queue<Supplier<T>> workingQueue, Queue<CompletableFuture<T>> pendingQueue) {
this.executor = executor;
this.workingQueue = workingQueue;
this.pendingQueue = pendingQueue;
}
}
Internal ThrottlingParallelCollector
could be improved.
For now, it firstly drains the source stream into a working queue and only then starts the whole processing. This could be problematic if we are dealing with a slow producer.
The consumption could start immediately without a need to wait for the whole stream to get consumed first.
And replace its usage with ThrottlingDispatcher
with parallelism defaulting to Runtime.getRuntime().availableProcessors()
README.md
with examplesCurrently, most tests are super repetitive copy-pastes of each other.
All because of the time and pain related to working with Java generics... but I still believe this can be simplified to avoid all this form of copy-pastes.
... maintaining encounter order
It seems like implicit parallelism resolution based on the number of processors is doing more harm than good, and doesn't really go along with the main use case of the project (slow/blocking operations).
For now, all ParallelCollectors
schedule each Stream item to be processed separately.
This works just fine in most cases since it provides a natural work-stealing by letting all threads compete for each job separately, but can be problematic if the original Stream
contains significant number of elements.
Describe the solution you'd like
Introduce API methods that allow choosing a scheduling strategy where batching is one of them.
This could be achieved by introducing a static inner class Batching
into ParallelCollectors
to provide natural namespacing:
ParallelCollectors.Batching.parallel(i -> i, executor)
This should be a preferred option since adding more parameters into existing methods would make them unacceptable.
They bloat the API and it's not really a problem to achieve this by default (actually the current implementation preserves encounter order)
Some predefined RejectedExecutionHandler
can contribute to deadlock by design - we should check for those and throw an exception preemptively
If we have a look at base benchmarks(attached), you can see that throughput suffers a lot when dealing with non-batching cases when parallelism == 1
, and in this case we're dealing with one big batch effectively.
This is quite a peculiar scenario since one would expect parallelism > 1
in most cases, however, it's still a valid scenario if someone wants to just perform the computation asynchronously without orchestrating things themselves.
Since there's no parallelism involved, we could easily just rely on a different Collector implementation that doesn't bring that much overhead.
/* 972ffbb @ Intel i7-4980HQ (8) @ 2.80GHz, 8u222
Benchmark (parallelism) Mode Cnt Score Error Units
Bench.parallel_batch_collect 1 thrpt 5 10218.766 ± 131.633 ops/s
// ...
Bench.parallel_batch_streaming_collect 1 thrpt 5 10715.432 ± 78.383 ops/s
// ...
Bench.parallel_collect 1 thrpt 5 67.891 ± 1.357 ops/s
// ...
Bench.parallel_streaming 1 thrpt 5 50.920 ± 1.569 ops/s
// ...
*/
The current readme suggests this example here:
list.stream()
.collect(parallelToList(i -> fetchFromDb(i), executor, 2)).orTimeout(1000, MILLISECONDS)
.thenAccept(System.out::println)
.thenRun(() -> System.out.println("Finished!"));
And it reiterates on the example later on. Please do not suggest fetching individual values from the database one by one, by ID, especially when an ID set is available, which there obviously is, in the presence of such an ID list / stream.
A lot of junior developers will think "hey we can speed up our app by distributing our trivial query across several threads". The result is saturated threads both on the client and in the database, when batch and/or bulk SQL operations would have been much better for both systems.
As API / tool vendors, it is our responsibility to show simple examples that do not misguide junior developers into thinking that a quick fix will really solve their problems.
Is your feature request related to a problem? Please describe.
dispatcher in Dispatcher is used to wrap the executor that is passed in which makes it hard to use Spring Security Concurrency Support
Describe the solution you'd like
Make it possible to configure the dispatcher implementation (or maybe some other solution that would fix this)
Describe alternatives you've considered
I don't see any alternatives
Additional context
The default (and recommended) implementation of security context holder strategy uses thread locals to keep track of the current security context so by default it doesn't get passed on to code executing in the threadpools so that's where the concurrency support comes in: it injects the security context from the current thread to the new one.
So when we use parallel-collectors, the security context is passed on to the dispatcher thread which does not pass it on to the executor thread.
Builder
withParallelism()
toCollection()
task()
Currently, ParallelCollectors
facade exposes Collector
implementation that cap the parallelism level at Integer.MAX_VALUE
which might be problematic if the tool ends up being used by someone that just skimmed over the documentation.
I'm thinking we should deprecate those static factories method and always force users to provide parallelism explicitly. Additionally, we could benefit from a simplified API (6 instead of 12 factory methods).
What do you think?
Another option is to still allow that but provide some reasonable defaults.
toStreamInParallel(Executor executor)
toStreamInParallel(Executor executor, int parallelism)
Need to return values in completion order.
Is your feature request related to a problem? Please describe.
I would like to use parallel-collectors in a spring boot application with spring cloud sleuth for tracing requests.
The problem is that the code executed on parallel-collectors loses the request tracing information. I think it happens because com.pivovarit.collectors.Dispacher
executes the tasks with a newLazySingleThreadExecutor()
that spring cloud is not able to instrument.
Describe the solution you'd like
It would be very helpful if com.pivovarit.collectors.Dispacher
allows us to configure the Executor
that runs the tasks instead of creating and using a newLazySingleThreadExecutor()
that spring cloud sleuth cannot instrument.
I think that spring cloud sleuth is using org.springframework.cloud.sleuth.instrument.async.TraceableExecutorService
to be able to support tracing in Executors.
Describe alternatives you've considered
I've tried to remove the newLazySingleThreadExecutor()
from Dispacher
and use the executor
to execute the tasks. It seems to work properly, but some tests have failed so it seems to be a bad solution.
Additional context
I've attached a simple spring boot with spring cloud sleuth application.
spring-boot-sleuth-parallel-collectors.zip
Start the application with ./mvnw spring-boot:run
Go to http://localhost:8080
in your navigator and check the application logs.
A message is written to the log when transforming some numbers with parallel-collectors, but the spring cloud sleuth "trace-id" is different for every thread:
The expected behavior would be to have the same "trace-id" for every thread:
Thank you so much for creating parallel-collectors!
XFrog triggers an alert on package: com.pivovarit:parallel-collectors
Looks like com.pivovarit is not registered hence groupId can be claimed by malicious user
Is your feature request related to a problem? Please describe.
I would like to add an example integrated in a Spring Boot Application
Describe the solution you'd like
I would like to add an Example explained as Use Case with the library.
Many thanks in advance.
public static <T> Collector<Supplier<T>, List<CompletableFuture<T>>, CompletableFuture<Void>> parallelConsumeOn(Executor executor) {
requireNonNull(executor, "executor can't be null");
// ...
}
public static <T> Collector<Supplier<T>, List<CompletableFuture<T>>, CompletableFuture<Void>> parallelConsumeOn(Executor executor, int parallelism) {
requireNonNull(executor, "executor can't be null");
// ...
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.