'Java ParallelStream: several map or single map

Introduction

I'm currently developing a program in which I use Java.util.Collection.parallelStream(), and wondering if it's possible to make it more Multi-threaded.

Several small map

I was wondering if using multiple map might allow the Java.util.Collection.parallelStream() to distribute the tasks better:

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
        .map(gson::toJson)
        .map(Document::parse)
        .map(InsertOneModel::new)
        .toList();

Single big map

For example a better distribution than:

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
        .map(puzzle -> new InsertOneModel<>(Document.parse(gson.toJson(puzzle))))
        .toList();

Question

Is there one of the solutions that is more suitable for Java.util.Collection.parallelStream(), or the two have no big difference?



Solution 1:[1]

I looked into the Stream source code. The result of a map operation is just fed into the next operation. So there is almost no difference between one big map() call or several small map() calls.

And for the map() operation a parallel Stream makes no difference at all. Meaning each input object will be processed until the end by the same Thread in any case.

Also note: A parallel Stream only splits up the work if the operation chain allows it and there is enough data to process. So for a small Collection or a Collection that allows no random access, a parallel Stream behaves like a sequential Stream.

Solution 2:[2]

The three steps (toJson/parse/new) have to be executed sequentially, so all you're effectively doing is comparing s.map(g.compose(f)) and s.map(f).map(g). By virtue of being a monad, Java Streams are functors, and the 2nd functor law states that, in essence, s.map(g.compose(f)) == s.map(f).map(g), meaning that the two alternative ways of expressing the computation will produce identical results. From a performance standpoint the difference between the two is likely to be minimal.

However, in general you should be careful using Collection.parallelStream. It uses the common forkJoinPool, essentially a fixed pool of threads shared across the entire JVM. The size of the pool is determined by the number of cores on the host. The problem with using the common pool is that other threads in the same process may also be using it at the same time as your code. This can lead to your code randomly and inexplicably slowing down - if another part of the code has temporarily exhausted the common thread pool, for example.

More preferable is to create your own ExecutorService by using one of the creator methods on Executors, and then submit your tasks to that.

private static final ExecutorService EX_SVC = Executors.newFixedThreadPool(16);

public static List<InsertOneModel<Document>> process(Stream<Puzzle> puzzles) throws InterruptedException {
    final Collection<Callable<InsertOneModel<Document>>> callables =
            puzzles.map(puzzle ->
                    (Callable<InsertOneModel<Document>>)
                            () -> new InsertOneModel<>(Document.parse(gson.toJson(puzzle)))
            ).collect(Collectors.toList());

    return EX_SVC.invokeAll(callables).stream()
            .map(fut -> {
                try {
                    return fut.get();
                } catch (ExecutionException|InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }).collect(Collectors.toList());
}

Solution 3:[3]

I doubt that there is much different in performance, but even if you proved it did have quicker performance I would still prefer to see and use the first style in code I had to maintain.

The first multi-map style is easier for others to understand, it is easier to maintain and easier to debug - for example adding peek stages for any stage of the processing chain.

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
    .map(gson::toJson)
    // easy to make changes for debug, moving peek up/down
    // .peek(System.out::println)
    .map(Document::parse)
    // easy to filter:
    // .filter(this::somecondition)
    .map(InsertOneModel::new)
    .toList();

If your requirements change - such as needing to filter the output, or capture the intermediate data by splitting to 2 collections, the first approach beats second every time.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 FLUXparticle
Solution 2
Solution 3