Reactive Input Output objects for Java:
- Fine tuned: fast or memory effecient
- RS-TCK compatible (see tests for publishers and subscribers)
- Just one dependency: the only compile dependency is JCTools library with concurent queues
Install
Add Maven dependency to pom.xml
:
<dependency>
<groupId>org.cqfn</groupId>
<artifactId>rio</artifactId>
<version><!-- see latest release --></version>
</dependency>
Snapshots are available at https://central.artipie.com/cqfn/maven
Maven repo.
Usage
The primary protocol for all objects is Publisher<ByteBuffer>
, it's either accepted or provided by various methods in main entry points.
Files
To create new reactive file instance use the constructor:
var file = new File(Paths.get("/tmp/my/file.txt"));
File
instance provides multiple content()
overloaded methods for reading, all of them returns Publisher<ByteBuffer>
. It's possible to specify Buffers
allocation strategy and ExecutorService
for subscriber callbacks. By default contet()
method allocates 8KB
buffers for each read and performs Subscriber
calls on the same thread as IO reader task. For wriging, File
has write(Publisher<ByteBuffer>)
overloaded methods, where the user can specify file's OpenOptions
and WriteGreed
(see "appendix" section for more details), by default the WriteGreed
is a (3,1)
. write()
methods returns CompletionStage
instance, that can be used to handle completion signal, errors, and to perform cancellation.
Examples:
Copy one file to another using 1KB
buffer chunks:
var destination = new File(Path.get("out.txt"));
var source = new File(Path.get("in.txt"));
destination.write(source.content(Buffers.1K));
Calculate SHA256 of file reactively (with RxJava Flowable
reducer):
var sha256 = Flowable.fromPublisher(new File(Path.get("target")).readuceWith(
() -> MessageDigest.getInstance("SHA-256"),
(digest, buf) -> {
digest.update(buf);
return digest;
}
).map(MessageDigest::digest).blockingGet();
Channels
RIO has two wrappers for channels from java.nio:
WritableChannel
as reactive wrapper forWritableByteChannel
ReadableChannel
as reactive wrapper forReadableByteChannel
WritableChannel
accepts any instance of WritableByteChannel
insterface and exposes write(Publisher<ByteBuffer>)
method overloads which accepts publisher of byte buffers to write into the channel, it returns CompletionStage<Void>
for completions and error events, and cancellation.
ReadableChannel
wraps ReadableByteChannel
and exposes read()
overloaded methods to return Publisher<ByteBuffer>
read from the channel.
Streams
Reactive wrappers for old Java IO streams API are similar to channels:
ReactiveInputStream
to wrapInputStreams
s and exposeread()
methods to returnPublisher<ByteBuffer>
ReactiveOutputStream
to wrapOutputStream
s and providewrite(Publisher<ByteBuffer>)
methods
Configuration
Buffers
RIO providers Buffers
class to specify buffers allocation strategy, usually the instance of this interface is accepted by reading methods. It provides new ByteBuffers
when new read request is performed. Some standard strategies could be found in Buffers.Standard
enum, the default value for all methods is 8KB
standard enum value. Since Buffers
is interface, it's possible to implement custom allocation strategy in client's code.
Greed
To fine tune the speed or memory usage of write, the client is able to configure the WriteGreed
level. It configures the amount of buffers to requests and when. By default it's (3,1)
it requests 3 buffers at the beginning and requesting 3 buffers when the last-1 buffer was received:
[subscribe] || [write] | [write] | [write] || [write] | [write] | [write] ||
req(3) || | req(3) | || | req(3) | ||
to consume less memory WriteGreed.SINGLE
can be used which requests one by one buffer at the beginning and after write operation. These values can be configured via write
overloaded 2-nd parameter:
file.write(path, new WriteGreed.Constant(10, 2)) // request 10 buffers when read 8
or via system properties for default imeplementation:
# request 10 buffers when read 8 (only for default write method)
java -Drio.file.write.greed.amount=10 -Drio.file.write.greed.shift=2