Parallel Stream Support
- Parallel streams in Java with a custom ForkJoinPool
Parallel streams are by default processed in the common pool. This is fine for many cases but sometimes it is not useful since there is no control over what else is running in the common pool. For example, an external library might use the common pool for IO-intensive tasks and therefore prevent the performance critical parallel streams of your application from executing. The stream API does not offer a mechanism to process parallel streams in a user-defined thread pool.
This library works around this gap and offers parallel streams in dedicated ForkJoinPools. Like the standard stream API this library offers streams for Java objects and for int
, long
and double
primitive types.
In the java.util.stream.AbstractTask class, the LEAF_TARGET field determines the amount of splitting that is done, which in turn determines the amount of parallelism that can be achieved. The value of this field is based on
ForkJoinPool.getCommonPoolParallelism()
which of course uses the parallelism of the common pool, not whatever pool happens to be running the tasks.
How to Use
Version Compatibility
Each major version of this library belongs to one or more major versions of the JDK:
Library Version | JDK Version(s) | Comment |
---|---|---|
1.x.x | JDK 8 | Covers all methods of Stream and its primitive variants. |
2.x.x | JDKs 9-14 | Adds support for takeWhile() and dropWhile() |
Dependencies
The Parallel Stream Support library is available on Maven Central. So no further repository configuration is required.
<dependencies>
<dependency>
<groupId>com.github.ferstl</groupId>
<artifactId>parallel-stream-support</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
Creating Parallel Streams
To create a parallel stream you need to instantiate a ForkJoinPool and call one of the static factory methods of ParallelStreamSupport, ParallelIntStreamSupport, ParallelLongStreamSupport or ParallelDoubleStreamSupport. The factory methods are based on:
- The static factory methods of the Stream interface or its primitive variants
- The static factory methods of StreamSupport
- Collection.parallelStream()
- Arrays.stream()
Take a look at the Javadoc for a complete overview of the factory methods.
Example 1: Calculate the average weight of Elements
public double getAverageWeight(Collection<Element> elements) {
ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
double averageWeight = ParallelStreamSupport.parallelStream(elements, pool)
.filter(Element::isAvailable)
.mapToDouble(Element::getWeight)
.average()
.orElse(0.0);
return averageWeight;
}
Example 2: Find a particular number in a random Array
public void find42(int[] randomArray) {
ForkJoinPool pool = new ForkJoinPool();
ParallelIntStreamSupport.parallelStream(randomArray, pool)
.filter(i -> i == 42)
.findAny()
.orElseThrow(IllegalStateException::new);
}
FAQ
Q: How does it work?
A: The mechanism is pretty simple. A terminal operation on a parallel stream will recognize if it is executed within a ForkJoinPool. If it is not, the execution will be handed over to the common pool. If the operation was already started in a ForkJoinPool it will use that pool to complete the operation. This library utilizes this behavior and makes sure that terminal operations are started as task in the user-defined ForkJoinPool.
Q: Does this library also support sequential streams?
A: Yes. Just call sequential()
on the stream and it will be processed within the calling thread. When created, all streams of this library are configured to be parallel.
Q: Why is there no public constructor which takes a regular Java stream and a ForkJoinPool?
A: It is a strong principle of this library to hide the details of its implementation. So the only place you have to deal with a concrete class of this library is when a new parallel stream is created. Afterwards you will only see the standard interfaces Stream, IntStream, LongStream and DoubleStream. Furthermore, each stream variant offers similar factory methods as in StreamSupport, which allows you to create a parallel stream from any other stream by just calling Stream.spliterator(). Should you still think an essential factory method is missing, please report an issue on this project.
How to Build
# Normal build with Javadoc and tests
mvn clean install -Pgenerate-javadoc
# Release
mvn release:preapare release:perform
# Coverage and publish to Coveralls
# (reopToken not required on Travis)
mvn clean jacoco:prepare-agent test jacoco:report coveralls:report -DrepoToken=<the-secret-token>