Giter Club home page Giter Club logo

parallel-collectors's Issues

Inconsistent behavior when parallel=1

To parallelize some side-effects I wrote following code which works as expected on my local machine:

  public static void main( String[ ] args ) {
        var executor = Executors.newFixedThreadPool(10 );
        try {
            var list = List.of( "A", "B" );

            list.stream( )
                    .collect( ParallelCollectors.parallel( TestCase::processRecord, executor) )
                    .join( );
        } finally {
            executor.shutdown( );
        }
    }

    private static String processRecord( String record ) {
        System.out.println( Thread.currentThread( ).getName( ) );
        return record;
    }
$ docker run --rm testcase
pool-1-thread-2
pool-1-thread-1

However it took me a while to figure out why it doesn't when deployed on our cluster (Mesos & K8S). Eventually, I figured out that since release 2.3.0, parallel-collectors has an inconstent behavior when parallel=1 and parallel>1.

$ docker run --rm -cpuset-cpus=0 testcase
$ docker run --rm -cpuset-cpus=0,1 testcase
$ docker run --rm -cpuset-cpus=0,1,2 testcase
pool-1-thread-2
pool-1-thread-1

When parallel=1, mapping function is only invoked when the stream returned by parallel().join() is consumed by a terminal operation (some like count won't as the mapping function can be optimized away).

  public static void main( String[ ] args ) {
        var executor = Executors.newFixedThreadPool( 10 );
        try {
            var list = List.of( "A", "B" );

            System.out.println( "count" );
            list.stream( )
                    .collect( ParallelCollectors.parallel( TestCase::processRecord, executor ) )
                    .join( )
                    .count( );

            System.out.println( "toList" );
            list.stream( )
                    .collect( ParallelCollectors.parallel( TestCase::processRecord, executor ) )
                    .join( )
                    .collect( Collectors.toList( ) );
        } finally {
            executor.shutdown( );
        }
    }
$ docker run --cpuset-cpus=0  --rm testcase
count
toList
main
main

When parallel>1, mapping is always performed in provider executor in an eager way (no need to consume the stream).

This seems a bug as Javadoc says that parallel computation will be performed on a custom executor which is not true in this case. It is also dangerous as it is seems too easy to write the same bogus code than me.

I would propose to revert to 2.2.0 behaviour.

Short-circuit when exception gets thrown

In case an exception gets thrown, collectors should immediately terminate operations and return a future completed exceptionally.

@Test
void shouldCollectToCollectionAndShortCircuitOnException() {

    // given
    executor = threadPoolExecutor(1);
    LongAdder counter = new LongAdder();

    try {
        IntStream.generate(() -> 42).boxed().limit(10)
          .map(i -> supplier(() -> {
              counter.increment();
              throw new IllegalArgumentException();
          }))
          .collect(parallelToList(executor, 1))
          .join();
    } catch (Exception e) {
    }

    assertThat(counter.longValue()).isOne();
}

Implement collectors accepting mappers as parameters

public static <T, R> Collector<T, List<CompletableFuture<R>>, CompletableFuture<List<R>>> toListInParallel(Function<T, R> mapper, Executor executor)

So that users are not forced to implement an intermediate operation of creating Supplier instances.

High executor thread consumption

Describe the bug

Upon upgrading parallel-collectors from version 2.5.0 to 2.6.0, we've observed an increase in thread consumption.
We are using a org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor with the following parameters: corePoolSize set to 1500, maxPoolSize set to 3000, and queueCapacity set to 100.

While on version 2.5.0, we never observed corePoolSize exceeding 1500.
However, upon upgrading to version 2.6.0, corePoolSize increases from 1500 a few seconds after application startup. No other changes were made.

To Reproduce
I've prepared a unit test that functions correctly in the 2.5.0 tag but fails in the 2.6.0 tag and master branch:

@Test
void shouldExecuteParallelStreamsSequentially() {

    int tasks = 2;
    ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(tasks,
                                                           tasks,
                                                           0L,
                                                           MILLISECONDS,
                                                           new LinkedBlockingQueue<>(tasks));

    Stream.of(210, 200, 160, 180)
        .collect(parallelToStream(i -> returnWithDelay(i, ofMillis(i)), myExecutor, tasks))
        .collect(toList());

    // 2.6.0 and master fail from here
    Stream.of(210, 200, 160, 180)
        .collect(parallelToStream(i -> returnWithDelay(i, ofMillis(i)), myExecutor, tasks))
        .collect(toList());
}

Expected behavior
I expect the unit test to pass in master branch and 2.6.0 tag.

Thank you in advance.

Is Filtering/FlatMap supported?

Hi,

Its a very nice project, and I want to use it.
I have a question,
can I use filtering and flatMap functionality that is available in streams in Parallel-collectors?

Implement ParallelCollectors.toCombinedFuture

Let's add a method returning a simple collector that collects a stream of futures into a single future.

public static void main(String[] args) {
        CompletableFuture<List<Integer>> collect = Stream.of(1, 2, 3)
          .map(i -> CompletableFuture.supplyAsync(() -> i))
          .collect(toFuture());
    }

    public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> toFuture() {
        return Collectors.collectingAndThen(Collectors.toList(), list -> {
            CompletableFuture<List<T>> future = CompletableFuture
              .allOf(list.toArray(new CompletableFuture[0]))
              .thenApply(__ -> list.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));

            for (CompletableFuture<T> f : list) {
                f.whenComplete((integer, throwable) -> {
                    if (throwable != null) {
                        future.completeExceptionally(throwable);
                    }
                });
            }

            return future;
        });
    }

Introduce API methods that allow reusing native Java Collectors

  • <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Collector<R, ?, RR> c, Function<T, R> m, Executor e, int parallelism)
  • <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> m, Executor e, int parallelism)

A signature like this would allow reducing API surface by getting rid of *toList/toMap/toSet factories and replacing them with one API that allows users to pass their own Java Collectors.

So, instead of:

  • .collect(parallelToSet(i -> i + 2, executor, 2));
  • .collect(parallelToList(i -> i + 2, executor, 7));

One would expect:

  • CF<Set<Integer>> r = ...collect(parallel(toSet(), i -> i + 2, executor, 2));
  • CF<List<Integer>> r = ...collect(parallel(toList(), i -> i + 2, executor, 2));

Collector> parameter could be omitted and would result in a CF containing unprocessed Stream instances:

  • CF<Stream<Integer>> r = ...collect(parallel(i -> i + 2, executor, 2));
  • CF<Stream<Integer>> r = ...collect(parallel(i -> i + 2, executor, 2));

The difference for users is subtle but would allow to greatly simplify the implementation.

Implement blocking alternatives

Blocking ParallelCollectors should not return CompletableFutures but results directly:

Set<String> result = Stream.of(1, 2, 3)
  .collect(inParallelToSet(i -> foo(), executor));

instead of:

CompletableFuture<Set<String>> result = Stream.of(1, 2, 3)
  .collect(inParallelToSet(i -> foo(), executor));

Extract common logic to ParallelDispatcher

public class ParallelDispatcher<T> {

    final ExecutorService dispatcher = newSingleThreadExecutor(new CustomThreadFactory());
    final Executor executor;
    final Queue<Supplier<T>> workingQueue;
    final Queue<CompletableFuture<T>> pendingQueue;

    public ParallelDispatcher(Executor executor, Queue<Supplier<T>> workingQueue, Queue<CompletableFuture<T>> pendingQueue) {
        this.executor = executor;
        this.workingQueue = workingQueue;
        this.pendingQueue = pendingQueue;
    }
}

Improve ThrottlingParallelCollector implementation

Internal ThrottlingParallelCollector could be improved.

For now, it firstly drains the source stream into a working queue and only then starts the whole processing. This could be problematic if we are dealing with a slow producer.

The consumption could start immediately without a need to wait for the whole stream to get consumed first.

Remove UnboundedDispatcher

And replace its usage with ThrottlingDispatcher with parallelism defaulting to Runtime.getRuntime().availableProcessors()

Technical Writing

  • JavaDocs
  • README.md with examples
  • "Parallel Collection Processing in Java without Parallel Streams" @4comprehension.com
  • social media announcement
    • Reddit
    • Twitter
    • LinkedIn

Remove repetition from tests

Currently, most tests are super repetitive copy-pastes of each other.

All because of the time and pain related to working with Java generics... but I still believe this can be simplified to avoid all this form of copy-pastes.

Enable batching scheduling strategy

For now, all ParallelCollectors schedule each Stream item to be processed separately.

This works just fine in most cases since it provides a natural work-stealing by letting all threads compete for each job separately, but can be problematic if the original Stream contains significant number of elements.

Describe the solution you'd like
Introduce API methods that allow choosing a scheduling strategy where batching is one of them.

This could be achieved by introducing a static inner class Batching into ParallelCollectors to provide natural namespacing:

ParallelCollectors.Batching.parallel(i -> i, executor)

This should be a preferred option since adding more parameters into existing methods would make them unacceptable.

Optimize parallelism == 1 edge cases

If we have a look at base benchmarks(attached), you can see that throughput suffers a lot when dealing with non-batching cases when parallelism == 1, and in this case we're dealing with one big batch effectively.

This is quite a peculiar scenario since one would expect parallelism > 1 in most cases, however, it's still a valid scenario if someone wants to just perform the computation asynchronously without orchestrating things themselves.

Since there's no parallelism involved, we could easily just rely on a different Collector implementation that doesn't bring that much overhead.


/* 972ffbb @ Intel i7-4980HQ (8) @ 2.80GHz, 8u222
Benchmark                               (parallelism)   Mode  Cnt      Score     Error  Units
Bench.parallel_batch_collect                        1  thrpt    5  10218.766 ± 131.633  ops/s
// ...
Bench.parallel_batch_streaming_collect              1  thrpt    5  10715.432 ±  78.383  ops/s
// ...
Bench.parallel_collect                              1  thrpt    5     67.891 ±   1.357  ops/s
// ...
Bench.parallel_streaming                            1  thrpt    5     50.920 ±   1.569  ops/s
// ...
 */

Please avoid suggesting parallel N+1 database interaction patterns

The current readme suggests this example here:

list.stream()
  .collect(parallelToList(i -> fetchFromDb(i), executor, 2)).orTimeout(1000, MILLISECONDS)
  .thenAccept(System.out::println)
  .thenRun(() -> System.out.println("Finished!"));

And it reiterates on the example later on. Please do not suggest fetching individual values from the database one by one, by ID, especially when an ID set is available, which there obviously is, in the presence of such an ID list / stream.

A lot of junior developers will think "hey we can speed up our app by distributing our trivial query across several threads". The result is saturated threads both on the client and in the database, when batch and/or bulk SQL operations would have been much better for both systems.

As API / tool vendors, it is our responsibility to show simple examples that do not misguide junior developers into thinking that a quick fix will really solve their problems.

Make it possible to configure the dispatcher instance in the Dispatcher

Is your feature request related to a problem? Please describe.
dispatcher in Dispatcher is used to wrap the executor that is passed in which makes it hard to use Spring Security Concurrency Support
Describe the solution you'd like
Make it possible to configure the dispatcher implementation (or maybe some other solution that would fix this)

Describe alternatives you've considered
I don't see any alternatives

Additional context
The default (and recommended) implementation of security context holder strategy uses thread locals to keep track of the current security context so by default it doesn't get passed on to code executing in the threadpools so that's where the concurrency support comes in: it injects the security context from the current thread to the new one.
So when we use parallel-collectors, the security context is passed on to the dispatcher thread which does not pass it on to the executor thread.

Should unlimited parallelism level be allowed?

Currently, ParallelCollectors facade exposes Collector implementation that cap the parallelism level at Integer.MAX_VALUE which might be problematic if the tool ends up being used by someone that just skimmed over the documentation.

I'm thinking we should deprecate those static factories method and always force users to provide parallelism explicitly. Additionally, we could benefit from a simplified API (6 instead of 12 factory methods).

What do you think?
Another option is to still allow that but provide some reasonable defaults.

Support spring cloud sleuth

Is your feature request related to a problem? Please describe.
I would like to use parallel-collectors in a spring boot application with spring cloud sleuth for tracing requests.
The problem is that the code executed on parallel-collectors loses the request tracing information. I think it happens because com.pivovarit.collectors.Dispacher executes the tasks with a newLazySingleThreadExecutor() that spring cloud is not able to instrument.

Describe the solution you'd like
It would be very helpful if com.pivovarit.collectors.Dispacher allows us to configure the Executor that runs the tasks instead of creating and using a newLazySingleThreadExecutor() that spring cloud sleuth cannot instrument.
I think that spring cloud sleuth is using org.springframework.cloud.sleuth.instrument.async.TraceableExecutorService to be able to support tracing in Executors.

Describe alternatives you've considered
I've tried to remove the newLazySingleThreadExecutor() from Dispacher and use the executor to execute the tasks. It seems to work properly, but some tests have failed so it seems to be a bad solution.

Additional context
I've attached a simple spring boot with spring cloud sleuth application.
spring-boot-sleuth-parallel-collectors.zip

Start the application with ./mvnw spring-boot:run
Go to http://localhost:8080 in your navigator and check the application logs.

A message is written to the log when transforming some numbers with parallel-collectors, but the spring cloud sleuth "trace-id" is different for every thread:
Selection_721

The expected behavior would be to have the same "trace-id" for every thread:
Selection_722

Thank you so much for creating parallel-collectors!

MavenGate (CVE)

XFrog triggers an alert on package: com.pivovarit:parallel-collectors

Looks like com.pivovarit is not registered hence groupId can be claimed by malicious user

Add an example integrated with Spring Boot

Is your feature request related to a problem? Please describe.

I would like to add an example integrated in a Spring Boot Application

Describe the solution you'd like

I would like to add an Example explained as Use Case with the library.

Many thanks in advance.

Implement parallelConsume()

public static <T> Collector<Supplier<T>, List<CompletableFuture<T>>, CompletableFuture<Void>> parallelConsumeOn(Executor executor) {
        requireNonNull(executor, "executor can't be null");
        // ...
    }

    public static <T> Collector<Supplier<T>, List<CompletableFuture<T>>, CompletableFuture<Void>> parallelConsumeOn(Executor executor, int parallelism) {
        requireNonNull(executor, "executor can't be null");
        // ...
    }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.