zio-interop-reactivestreams


License

License

Categories

Categories

React User Interface Web Frameworks
GroupId

GroupId

dev.zio
ArtifactId

ArtifactId

zio-interop-reactivestreams_0.23
Last Version

Last Version

1.0.3.5-RC12
Release Date

Release Date

Type

Type

jar
Description

Description

zio-interop-reactivestreams
zio-interop-reactivestreams
Project URL

Project URL

https://zio.dev
Project Organization

Project Organization

dev.zio
Source Code Management

Source Code Management

https://github.com/zio/interop-reactive-streams/

Download zio-interop-reactivestreams_0.23

How to add to project

<!-- https://jarcasting.com/artifacts/dev.zio/zio-interop-reactivestreams_0.23/ -->
<dependency>
    <groupId>dev.zio</groupId>
    <artifactId>zio-interop-reactivestreams_0.23</artifactId>
    <version>1.0.3.5-RC12</version>
</dependency>
// https://jarcasting.com/artifacts/dev.zio/zio-interop-reactivestreams_0.23/
implementation 'dev.zio:zio-interop-reactivestreams_0.23:1.0.3.5-RC12'
// https://jarcasting.com/artifacts/dev.zio/zio-interop-reactivestreams_0.23/
implementation ("dev.zio:zio-interop-reactivestreams_0.23:1.0.3.5-RC12")
'dev.zio:zio-interop-reactivestreams_0.23:jar:1.0.3.5-RC12'
<dependency org="dev.zio" name="zio-interop-reactivestreams_0.23" rev="1.0.3.5-RC12">
  <artifact name="zio-interop-reactivestreams_0.23" type="jar" />
</dependency>
@Grapes(
@Grab(group='dev.zio', module='zio-interop-reactivestreams_0.23', version='1.0.3.5-RC12')
)
libraryDependencies += "dev.zio" % "zio-interop-reactivestreams_0.23" % "1.0.3.5-RC12"
[dev.zio/zio-interop-reactivestreams_0.23 "1.0.3.5-RC12"]

Dependencies

compile (4)

Group / Artifact Type Version
ch.epfl.lamp : dotty-library_0.23 jar 0.23.0-RC1
dev.zio : zio_0.23 jar 1.0.0-RC21
dev.zio : zio-streams_0.23 jar 1.0.0-RC21
org.reactivestreams : reactive-streams jar 1.0.3

test (3)

Group / Artifact Type Version
dev.zio : zio-test_0.23 jar 1.0.0-RC21
dev.zio : zio-test-sbt_0.23 jar 1.0.0-RC21
org.reactivestreams : reactive-streams-tck jar 1.0.3

Project Modules

There are no modules declared in this project.

Interop reactive streams

CI Releases Snapshots

This library provides an interoperability layer for reactive streams.

Reactive Streams Producer and Subscriber

ZIO integrates with Reactive Streams by providing conversions from zio.stream.Stream to org.reactivestreams.Publisher and from zio.stream.Sink to org.reactivestreams.Subscriber and vice versa. Simply import import zio.interop.reactivestreams._ to make the conversions available.

Examples

First, let's get a few imports out of the way.

import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactivestreams._
import zio.stream._

val runtime = new DefaultRuntime {}

We use the following Publisher and Subscriber for the examples:

val publisher = new RangePublisher(3, 10)
val subscriber = new SyncSubscriber[Int] {
  override protected def whenNext(v: Int): Boolean = {
    print(s"$v, ")
    true
  }
}

Publisher to Stream

A Publisher used as a Stream buffers up to qSize elements. If possible, qSize should be a power of two for best performance. The default is 16.

val streamFromPublisher = publisher.toStream(qSize = 16)
runtime.unsafeRun(
  streamFromPublisher.run(Sink.collectAll[Integer])
)

Subscriber to Sink

When running a Stream to a Subscriber, a side channel is needed for signalling failures. For this reason toSink returns a tuple of Promise and Sink. The Promise must be failed on Stream failure. The type parameter on toSink is the error type of the Stream.

val asSink = subscriber.toSink[Throwable]
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
runtime.unsafeRun(
  asSink.flatMap { case (errorP, sink) =>
    failingStream.run(sink).catchAll(errorP.fail)
  }
)

Stream to Publisher

val stream = Stream.range(3, 13)
runtime.unsafeRun(
  stream.toPublisher.flatMap { publisher =>
    UIO(publisher.subscribe(subscriber))
  }
)

Sink to Subscriber

toSubscriber returns a Subscriber and an IO which completes with the result of running the Sink or the error if the Publisher fails. A Sink used as a Subscriber buffers up to qSize elements. If possible, qSize should be a power of two for best performance. The default is 16.

val sink = Sink.collectAll[Integer]
runtime.unsafeRun(
  sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) => 
    UIO(publisher.subscribe(subscriber)) *> result
  }
)
dev.zio

ZIO

ZIO — Real World Functional Programming

Versions

Version
1.0.3.5-RC12
1.0.3.5-RC11