MicroProfile Reactive Streams Operators
Rationale
Reactive Streams is an integration SPI - it allows two different libraries that provide asynchronous streaming to be able to stream data to and from each other.
Reactive Streams is not however designed to be used directly by application developers. The semantics defined by Reactive Streams are very strict, and are non trivial, particularly in the areas of thread safety, to implement correctly. Typically, application developers are expected to use third party libraries that provide the tools necessary to manipulate and control streams. Examples include Akka Streams, RxJava and Reactor.
Depending on third party libraries for this essential application developer facing functionality however is not something that MicroProfile can do. MicroProfile specifications are not allowed to depend on anything other than the JDK and other MicroProfile specifications.
Hence, for MicroProfile to provide application developer APIs that use Reactive Streams, MicroProfile must provide its own Reactive Streams manipulation and control library. In future, it is hoped that this library will be adopted by the JDK itself, after a period of suitable incubation in the MicroProfile project. Some JDK maintainers have indicated that this would be a suitable path to get this type of functionality into the JDK.
Influences and History
The naming and scope of this API is inspired by the JDK8 java.util.stream API. The JDK8 stream API however does not capture all typical functionality that an application developer using Reactive Streams would need. For additional API naming and scope, Akka Streams, RxJava and Reactor have been used as an inspiration.
Implementations
MicroProfile Reactive Streams does not contain an implementation itself but only provides the specified API, a TCK and documentation.
The following Implementations are available:
-
Zero Dependency - intended as a possible reference implementation for when this is proposed to the JDK, but not the reference implementation for MicroProfile as MicroProfile does not have reference implementations.
-
SmallRye - based on RxJava 2 and Eclipse Vert.x.
Design
MicroProfile Reactive Streams offers a series of builders for creating instances of Reactive Streams Publisher
, Subscriber
and Processor
. Subscriber’s
are associated with a CompletionStage
of a result of consuming the stream, this may be the result of a reduction on the elements, or in some cases just indicates the termination of the stream either as a success or with an error.
The API builds a graph from a series of graph stages, which are provided as an SPI. A Reactive Streams engine, which is implemented by implementations of the specification, and can also manually be provided by end users, is responsible for building the graph into a running stream.
Reactive Streams is available in JDK9 in the java.util.concurrent.Flow
API, however, MicroProfile is not ready to require a baseline JDK version above 8. For this reason, the same interfaces provided by https://www.reactive-streams.org in the org.reactivestreams
package are used instead. This does place a dependency from MicroProfile to a third party library, however, that third party library is nothing more than the four Reactive Streams interfaces (Publisher
, Subscriber
, Subscription
and Processor
), and these have been copied as is into JDK9. As an interim solution while JDK9 adoption catches up, this has been deemed an acceptable exception to the rule. The approach documented in MicroProfile Approach to Reactive for ensuring a smooth transition to the JDK9 APIs has been adopted.
Building
The whole MicroProfile Reactive Streams project can be built via Apache Maven.
$> mvn clean install
Diagrams
The API documentation makes extensive use of marble diagrams to visualize what each operator does. These diagrams are generated using a DSL written specifically for this purpose, rather creating them in a visual diagram editor which would make it very difficult to ensure consistency, especially between different contributors.
The DSL is implemented in JavaScript, and the diagrams are created using SVG. This combination of technologies allows rapid development of the diagrams, since they can be run, inspected, debugged and tweaked directly in a web browser. They are then exported to PNG files using Puppeteer, a high level JavaScript API for controlling headless Chrome, running on node.
Editing diagrams
All the diagrams are declared in mp-rs-ops-marbles.js
. This contains a map of all the graphs. Each graph has an array of stages, and stages have zero to many elements. The stages are as follows:
-
ins
- An input stream. Calledins
becausein
is a JavaScript keyword, andins
makes the width 3 characters for nice alignment with other stages. The first parameter may optionally be a map of options, the only option currently read islabel
, which is used to label the stage. -
sub
- A sub stream. This will automatically have a label generated for it,stream[n]
, wheren
is the number of the sub stage, starting from one. -
out
- An output stream, the difference betweenout
andins
is thatout
marbles inherit their colour from the previousins
marble. -
op
- The operator stage. The first argument must be the operator, this may be followed by zero to many intermediate value marbles. -
eff
- An effect stream. Used to visualize side effects (eg, callbacks executed byforEach
). -
res
- A result stage. This displays the result that gets redeemed by aCompletionSubscriber
when it completes.
Each of the stages support zero to many marbles, which are declared as follows:
-
n
- an ‘onNext’ marble. A regular element passed through the stream. The colour is automatically computed - each marble in anins
stream gets a new colour, while all other marbles have their colour selected based on the previousins
marble, searching up vertically first, then left. May optionally take a options argument, supporting alink
argument which indicates that this marble should be linked to (eg as a feedback loop from the output back to the input) another marble. -
term
- anonComplete
marble, indicating successful termination of the stream. -
nterm
- a hybrid betweenn
andterm
, allowing a marble to also carry a termination signal. This is used to make it clear that when a certain marble is emitted, that is the last marble and the stream will be terminated immediately. -
err
- anonError
marble, indicating failed termination of the stream. -
e
- a side effect. Should usually contain an example callback invocation. -
i
- an intermediate value. This is used for operators that compute intermediate values that are fed back into the operator callbacks with the next element. -
r
- a result value. This is used with theres
stage, to indicate the value that gets redeemed by the resultCompletionSubscriber
. -
none
- No marble. Inserts a blank space between marbles. Needed to ensure alignment of cause and effected marbles.
After editing or creating a new diagram, you can test your changes by opening them in a browser. Before you do this on the very first time, you need to run npm install
in the api
module to ensure the JavaScript dependencies are installed. This will be done automatically if you run mvn process-resources
(or any lifecycle phase after process-resources
, such as compile
or install
) - the build uses the Maven frontend plugin to install Node, install npm, then run npm to install the dependencies. Included in the dependencies is an installation of Chromium which is used to generate the PNG diagrams for inclusion in the javadocs, this may take a while to download.
Once the dependencies are installed, you can then open api/src/docs/js/index.html
, this will show you all the rendered diagrams. No generation step is required to view these diagrams, you can simply hit refresh in the browser after making any changes.
Generating diagrams
We convert the diagrams to SVG, then to PNG, by using Puppeteer, a high level API on top of Chrome running in headless mode. The SVG diagrams are generated in Chrome, and then screenshotted to create the PNGs. This is automatically done by Puppeteer. However, we can’t run this as a part of the regular build because the MicroProfile CI and release server does not have the necessary dependencies to run Chrome. We’ve investigated a variety of different alternatives, including using different strategies for generating the diagrams, but nothing viable has come up, and unfortunately installing shared libraries in the Eclipse CBI is too high a maintenance burden for the Eclispe CBI maintainers, so they’ve refused to do it. Consequently, we need to check the diagrams into git, which means whenever they are changed, they need to be manually regenerated.
To generate the diagrams, run:
mvn -Pmarble-diagrams clean package
The diagrams will be saved to api/src/main/java/org/eclipse/microprofile/reactive/streams/doc-files
, from there they can be included in the javadocs using an image tag, eg:
<img src="doc-files/map.png" alt="map marble diagram">
Make sure to include the alt
text, the CI build will fail if it’s not there.
You can then view the diagrams in the api docs by opening api/target/apidocs/index.html
, and navigating to the class that you added the marble diagram to.
Before committing your changes, make sure to use the above command to generate the diagrams, and then check the results of it into git, including the updated marble-diagram-hashes.json
file. As part of the verification of the build, we have a task that checks that all the hashes of all the input and output files from the diagram generation process match the hashes when the diagrams were last generated. Failure to do this will result in the build failing in CI, and so it won’t pass PR validation.
Contributing
Do you want to contribute to this project? Find out how you can help here.