Parallel Stream Support

Parallel streams in Java with a custom ForkJoinPool.

License

License

MIT
GroupId

GroupId

com.github.ferstl
ArtifactId

ArtifactId

parallel-stream-support
Last Version

Last Version

2.0.0
Release Date

Release Date

Type

Type

jar
Description

Description

Parallel Stream Support
Parallel streams in Java with a custom ForkJoinPool.
Project URL

Project URL

https://github.com/ferstl/parallel-stream-support
Source Code Management

Source Code Management

https://github.com/ferstl/parallel-stream-support

Download parallel-stream-support

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.ferstl/parallel-stream-support/ -->
<dependency>
    <groupId>com.github.ferstl</groupId>
    <artifactId>parallel-stream-support</artifactId>
    <version>2.0.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.ferstl/parallel-stream-support/
implementation 'com.github.ferstl:parallel-stream-support:2.0.0'
// https://jarcasting.com/artifacts/com.github.ferstl/parallel-stream-support/
implementation ("com.github.ferstl:parallel-stream-support:2.0.0")
'com.github.ferstl:parallel-stream-support:jar:2.0.0'
<dependency org="com.github.ferstl" name="parallel-stream-support" rev="2.0.0">
  <artifact name="parallel-stream-support" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.ferstl', module='parallel-stream-support', version='2.0.0')
)
libraryDependencies += "com.github.ferstl" % "parallel-stream-support" % "2.0.0"
[com.github.ferstl/parallel-stream-support "2.0.0"]

Dependencies

test (3)

Group / Artifact Type Version
org.junit.jupiter : junit-jupiter jar
org.hamcrest : java-hamcrest jar 2.0.0.0
org.mockito : mockito-core jar 3.0.0

Project Modules

There are no modules declared in this project.

Parallel Stream Support

- Parallel streams in Java with a custom ForkJoinPool

Build Status Coverage Status Maven Central Javadocs license

Parallel streams are by default processed in the common pool. This is fine for many cases but sometimes it is not useful since there is no control over what else is running in the common pool. For example, an external library might use the common pool for IO-intensive tasks and therefore prevent the performance critical parallel streams of your application from executing. The stream API does not offer a mechanism to process parallel streams in a user-defined thread pool.

This library works around this gap and offers parallel streams in dedicated ForkJoinPools. Like the standard stream API this library offers streams for Java objects and for int, long and double primitive types.

⚠️ Having parallel streams outside of the common pool is not an officially supported feature and might stop working or behave differently in later Java releases! Furthermore the splitting policy of Parallel Streams depends strongly on the configured parallelism of the common pool. See this StackOverflow Answer from Stuart Marks for further details:

In the java.util.stream.AbstractTask class, the LEAF_TARGET field determines the amount of splitting that is done, which in turn determines the amount of parallelism that can be achieved. The value of this field is based on ForkJoinPool.getCommonPoolParallelism() which of course uses the parallelism of the common pool, not whatever pool happens to be running the tasks.

How to Use

Version Compatibility

Each major version of this library belongs to one or more major versions of the JDK:

Library Version JDK Version(s) Comment
1.x.x JDK 8 Covers all methods of Stream and its primitive variants.
2.x.x JDKs 9-14 Adds support for takeWhile() and dropWhile()

Dependencies

The Parallel Stream Support library is available on Maven Central. So no further repository configuration is required.

<dependencies>
  <dependency>
    <groupId>com.github.ferstl</groupId>
    <artifactId>parallel-stream-support</artifactId>
    <version>2.0.0</version>
  </dependency>
</dependencies>

Creating Parallel Streams

To create a parallel stream you need to instantiate a ForkJoinPool and call one of the static factory methods of ParallelStreamSupport, ParallelIntStreamSupport, ParallelLongStreamSupport or ParallelDoubleStreamSupport. The factory methods are based on:

Take a look at the Javadoc for a complete overview of the factory methods.

Example 1: Calculate the average weight of Elements

public double getAverageWeight(Collection<Element> elements) {
  ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);

  double averageWeight = ParallelStreamSupport.parallelStream(elements, pool)
    .filter(Element::isAvailable)
    .mapToDouble(Element::getWeight)
    .average()
    .orElse(0.0);
    
    return averageWeight;
}

Example 2: Find a particular number in a random Array

public void find42(int[] randomArray) {
  ForkJoinPool pool = new ForkJoinPool();

  ParallelIntStreamSupport.parallelStream(randomArray, pool)
      .filter(i -> i == 42)
      .findAny()
      .orElseThrow(IllegalStateException::new);
}

FAQ

Q: How does it work?

A: The mechanism is pretty simple. A terminal operation on a parallel stream will recognize if it is executed within a ForkJoinPool. If it is not, the execution will be handed over to the common pool. If the operation was already started in a ForkJoinPool it will use that pool to complete the operation. This library utilizes this behavior and makes sure that terminal operations are started as task in the user-defined ForkJoinPool.


Q: Does this library also support sequential streams?

A: Yes. Just call sequential() on the stream and it will be processed within the calling thread. When created, all streams of this library are configured to be parallel.


Q: Why is there no public constructor which takes a regular Java stream and a ForkJoinPool?

A: It is a strong principle of this library to hide the details of its implementation. So the only place you have to deal with a concrete class of this library is when a new parallel stream is created. Afterwards you will only see the standard interfaces Stream, IntStream, LongStream and DoubleStream. Furthermore, each stream variant offers similar factory methods as in StreamSupport, which allows you to create a parallel stream from any other stream by just calling Stream.spliterator(). Should you still think an essential factory method is missing, please report an issue on this project.

How to Build

# Normal build with Javadoc and tests
mvn clean install -Pgenerate-javadoc

# Release
mvn release:preapare release:perform

# Coverage and publish to Coveralls
# (reopToken not required on Travis)
mvn clean jacoco:prepare-agent test jacoco:report coveralls:report -DrepoToken=<the-secret-token>

Versions

Version
2.0.0
1.0.1
1.0.0