akka-stream-utils

A set of operators for akka-streams.

License

License

MIT
Categories

Categories

Akka Container Microservices Reactive libraries
GroupId

GroupId

be.broij
ArtifactId

ArtifactId

akka-stream-utils_2.13
Last Version

Last Version

0.0.0
Release Date

Release Date

Type

Type

jar
Description

Description

akka-stream-utils
A set of operators for akka-streams.
Project Organization

Project Organization

broij

Download akka-stream-utils_2.13

How to add to project

<!-- https://jarcasting.com/artifacts/be.broij/akka-stream-utils_2.13/ -->
<dependency>
    <groupId>be.broij</groupId>
    <artifactId>akka-stream-utils_2.13</artifactId>
    <version>0.0.0</version>
</dependency>
// https://jarcasting.com/artifacts/be.broij/akka-stream-utils_2.13/
implementation 'be.broij:akka-stream-utils_2.13:0.0.0'
// https://jarcasting.com/artifacts/be.broij/akka-stream-utils_2.13/
implementation ("be.broij:akka-stream-utils_2.13:0.0.0")
'be.broij:akka-stream-utils_2.13:jar:0.0.0'
<dependency org="be.broij" name="akka-stream-utils_2.13" rev="0.0.0">
  <artifact name="akka-stream-utils_2.13" type="jar" />
</dependency>
@Grapes(
@Grab(group='be.broij', module='akka-stream-utils_2.13', version='0.0.0')
)
libraryDependencies += "be.broij" % "akka-stream-utils_2.13" % "0.0.0"
[be.broij/akka-stream-utils_2.13 "0.0.0"]

Dependencies

compile (1)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.13.4

provided (2)

Group / Artifact Type Version
com.typesafe.akka : akka-actor-typed_2.13 jar 2.6.10
com.typesafe.akka : akka-stream_2.13 jar 2.6.10

test (2)

Group / Artifact Type Version
com.typesafe.akka : akka-stream-testkit_2.13 jar 2.6.10
org.scalatest : scalatest_2.13 jar 3.2.0

Project Modules

There are no modules declared in this project.

Table of contents

Introduction

This library provides a set of operators for akka-stream. Some of them come as an improvement of the standard operators packed with akka-stream whereas others implement new functionalities.

Usage

The library is available on maven central. To use it in a project, a dependency to the chosen version of akka-stream must be setup. Note that each release was tested on one particular version of akka-stream, which is given in the table here below.

Release Tested version
0.0.0 2.6.10

To use the operators, one can import the provided implicit conversions and use them on the sources/flows of his choice.

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import be.broij.akka.stream.SourceExtensions._
import be.broij.akka.stream.FlowExtensions._

implicit val system = ActorSystem()

Source(List(Source(List("h", "el", "lo")), Source.empty, Source.single("w"), Source(List("orl", "d"))))
  .concatenate
  .runForeach(System.out.print)

Some of the operators, namely window and slidingWindow, don't have any implicit conversions available and can only be instanciated via their factory methods. This is by design: these operators were conceived as basic blocks that can be used to ease the process of creating new types of operators. The other operators for which implicit conversions are available can also be instanciated that way.

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import be.broij.akka.stream.operators.flatten.Concatenate

implicit val system = ActorSystem()

Source(List(Source(List("h", "el", "lo")), Source.empty, Source.single("w"), Source(List("orl", "d"))))
  .via(Concatenate.apply)
  .runForeach(System.out.print)

Operators

The specification of each operators provided by the library is given in the following sections. In addition, one can refer to the tests in order to have a practical understanding on how to use the respective operators and clear out any ambiguïty.

DistinctKey

DistinctKey[T, K](keyOf: T => K): Flow[T, T, NotUsed]

Creates a flow filtering out the elements whose key is the same than the one of the precedent element. The function keyOf is used to extract the keys of the elements.

DistinctKeyExample An example of the stream which is produced by applying the distinctKey operator to a stream of integers where the keyOf function is the identity function.

FilterConsecutives

FilterConsecutives[T](shouldFilter: (Option[T], T) => Boolean): Flow[T, T, NotUsed]

Creates a flow filtering out the elements not matching the shouldFilter predicate. The function shouldFilter is a predicate taking as parameters an Option wrapping the last element emitted (None if no element was sent emitted yet) and the current element to be tested.

FilterConsecutivesExample An example of the stream which is produced by applying the filterConsecutives operator to a stream of integers. The predicate makes sure the integer are emitted in ascending order.

CaptureMaterializedValues

CaptureMaterializedValues[T, M](
  implicit materializer: Materializer
): Flow[Source[T, M], Source[T, NotUsed], Source[M, NotUsed]]

Creates a flow working with streams of streams. Let us refer to the streams embedded in a stream as substreams. The flow erases the materialized value of each substream. Its materialized value is another stream where the nth element gives the materialized value of the nth substream. The materializer is used to pre-materialize each substream in order to access their own materialized value.

Window

Window[T, F <: Frame[T]](implicit frameFactory: FrameFactory[T, F]): Flow[T, Seq[T], NotUsed]

Creates a flow turning streams of elements into streams of windows. Each window is a sequence of elements. The flow uses an implementation of the Frame trait that has to be provided by the user. An instance of the FrameFactory trait, frameFactory, must also be provided by the user. A frame represents a window being assembled. It defines several methods:

  • canAdd(item): False if the window is assembled and the item can’t be added, true otherwise;
  • add(item): Adds the item to the frame;
  • nonEmpty: True if the frame is not empty, false otherwise;
  • payloadSeq: Gives the content of the frame as a sequence of elements.

The frame factory also defines several methods:

  • apply(): Creates an empty frame;
  • apply(item): Creates a frame containing the given item.

To build the windows, the flow consumes the elements one after the others. It starts with an empty frame. It tries to add each element it consumes to that frame. If an element can’t be added to the frame, the window it represents is emitted as a sequence of elements and a new frame containing that element is created to pursue the process of windowing the stream.

WeightedWindow

WeightedWindow[T, W: Numeric](maxWeight: W, weightOf: T => W): Flow[T, Seq[T], NotUsed]

Creates a flow working on streams where each element has an associated weight obtained with the function weightOf. Let us call wn the weight associated to the nth element of such a stream. The flow turns such a stream of elements into a stream of windows. Each window is the longest sequence of consecutive elements, kept in emission order, that starts with a given element and whose cumulative weight doesn't exceed maxWeight. The first window starts with the first element of the stream. Let ln be the index of the last element of the nth window. The index of the first element of the nth window is fn = ln-1 + 1.

TimedWindow

TimedWindow[T](maxPeriod: FiniteDuration, timeOf: T => ZonedDateTime): Flow[T, Seq[T], NotUsed]

Creates a flow working on streams where each element has an associated timestamp obtained with the function timeOf. Let us call tn the timestamp associated to the nth element of such a stream. The flow assumes the elements are emitted in the order dictacted by their timestamps: prec. It turns such a stream of elements into a stream of windows. Each window is a sequence of timestamp-ordered elements giving the set of elements whose timestamps are included in a given time interval. Let ln be the index of the last element of the nth window. The index of the first element of the nth window is fn = ln-1 + 1. The first window starts with the first element of the stream: f0 = 0. The nth window contains the elements that occured in the [tfn, tfn + maxPeriod] time interval. The maxPeriod parameter defines the duration of the time intervals of each window.

SlidingWindow

SlidingWindow[T, F <: Frame[T]](implicit frameFactory: FrameFactory[T, F]): Flow[T, Seq[T], NotUsed]

Creates a flow turning streams of elements into streams of windows. Each window is a sequence of elements. The flow uses an implementation of the Frame trait that has to be provided by the user. An instance of the FrameFactory trait, frameFactory, must also be provided by the user. A frame represents a window being assembled. It defines several methods:

  • canAdd(item): False if the window is assembled and the item can’t be added, true otherwise;
  • add(item): Adds the item to the frame;
  • shrink(item): Fits the frame so that the item can be added and adds it;
  • nonEmpty: True if the frame is not empty, false otherwise;
  • payloadSeq: Gives the content of the frame as a sequence of elements.

The frame factory also defines several methods:

  • apply(): Creates a frame wrapping an empty window;

To build the windows, the flow consumes the elements one after the others. It starts with an empty frame. It tries to add each element it consumes to that frame. If an element can’t be added to the frame, the window it represents is emitted as a sequence of elements and the frame is then fit to contain that element to pursue the process of windowing the stream.

WeightedSlidingWindow

WeightedSlidingWindow[T, W: Numeric](maxWeight: W, weightOf: T => W): Flow[T, Seq[T], NotUsed]

Creates a flow working on streams where each element has an associated weight obtained with the function weightOf. Let us call wn the weight associated to the nth element of such a stream. The flow turns such a stream of elements into a stream of windows. Each window is the longest sequence of consecutive elements, kept in emission order, that starts with a given element and whose cumulative weight doesn't exceed maxWeight. The first window starts with the first element of the stream. Let ln be the index of the last element of the nth window. The nth + 1 window is fit to include the ln + 1 th element while repeating as much elements from the nth window as possible.

TimedSlidingWindow

TimedSlidingWindow[T](maxPeriod: FiniteDuration, timeOf: T => ZonedDateTime): Flow[T, Seq[T], NotUsed]

Creates a flow working on streams where each element has an associated timestamp obtained with the function timeOf. Let us call tn the timestamp associated to the nth element of such a stream. The flow assumes the elements are emitted in the order dictacted by their timestamps: prec. It turns such a stream of elements into a stream of windows. Each window is a sequence of timestamp-ordered elements giving the set of elements whose timestamps are included in a given time interval. The first window starts with the first element of the stream. Let fn be the index of the first element of the nth window. Such window contains the elements that occured in the [tfn, tfn + maxPeriod] time interval. The maxPeriod parameter defines the duration of the time intervals of each window. Let ln be the index of the last element of the nth window. The nth + 1 window is fit to include the ln + 1 th element while repeating as much elements from the nth window as possible.

Reorder

Reorder[T: Ordering, F <: Frame[T]](implicit frameFactory: FrameFactory[T, F]): Flow[T, T, NotUsed]
Reorder[T: Ordering, W: Numeric](maxWeight: W, weightOf: T => W): Flow[T, T, NotUsed]
Reorder[T: Ordering](maxPeriod: FiniteDuration, timeOf: T => ZonedDateTime): Flow[T, T, NotUsed]

Creates a flow working on streams of elements for which an Ordering implementation is provided by the user. Referring to that ordering implementation, it tries to make sure the elements are emitted in ascending order. Since a stream is an infinite sequence, it cannot be fully sorted. Instead, the flow splits the stream into windows of elements, sorts each of them and concatenates their content in order. The ordering is thus guaranteed per window. To sort each window, the flow uses the default scala sorting algorithm, which uses java.util.Arrays.sort. One can refer respectively to the Window, WeightedWindow or TimedWindow specifications to get more information about the role of each parameters of the builders whose specifications were given here above.

Flattening operators

These operators flatten streams: they turn streams of streams of a given abstraction into streams of that given abstraction. In the following subsections, we will refer to the streams embedded in a stream as substreams.

Concatenate

Concatenate[T]: Flow[Source[T, NotUsed], T, NotUsed]

Creates a flattening operator. The result is a flow taking the first substream and emitting all of its elements one by one in order. Then, when the substream completes, the flow takes the next substream and repeats that process. The flow completes when the stream completes and all of its substreams have been processed. It fails when the stream fails or the substream being processed fails.

MapConcatenate

MapConcatenate[T, U](mapper: T => Source[U, NotUsed]): Flow[T, U, NotUsed]

Creates a flow using the mapper function to turn each element of a stream in a substream and then flattening all of these substreams via the concatenate operator.

Switch

Switch[T](implicit materializer: Materializer): Flow[Source[T, NotUsed], T, NotUsed]

Creates a flattening operator. The result is a flow taking the first substream and emitting all of its elements one by one in order. As soon as the next substream is available, the flow stops the substream currently being processed. When the substream being processed completes, the flow takes the next substream and repeats that process. The flow completes when the stream completes and all of its substreams have been processed. It fails when the stream fails or the substream being processed fails. The materializer is used to pre-materialize each substream.

MapSwitch

MapSwitch[T, U](mapper: T => Source[U, NotUsed])
               (implicit materializer: Materializer): Flow[T, U, NotUsed]

Creates a flow using the mapper function to turn each element of a stream in a substream and then flattening all of these substreams via the switch operator. The materializer is used to pre-materialize each substream.

Join

Join[T](breadth: Option[BigInt]): Flow[Source[T, NotUsed], T, NotUsed]

Creates a flattening operator. The result is a flow taking the first N substreams and joining them by emitting all of their elements one by one in a FIFO fashion. When one of the substreams being joined completes, the flow takes the next substream and continues its process with that substream added in the set of substreams it joins. The flow completes when the stream completes and all of its substreams have been processed. It fails when the stream fails or one of the substreams being processed fails. The number of substreams to join at the same time is provided by the user as an optional value called breadth. When that optional value is set None, there is no limit on the maximum number to process simultaneously.

MapJoin

MapJoin[T, U](mapper: T => Source[U, NotUsed], breadth: Option[BigInt]): Flow[T, U, NotUsed]

Creates a flow using the mapper function to turn each element of a stream in a substream and then flattening all of these substreams via the join operator. The number of substreams to join at the same time is provided by the user as an optional value called breadth. When that optional value is set to None, there is no limit on the maximum number to process simultaneously.

JoinFairly

JoinFairly[T](n: BigInt, breadth: Option[BigInt]): Flow[Source[T, NotUsed], T, NotUsed]

Creates a flattening operator. The result is a flow taking the first M substreams and joining them by emitting the next n elements from the first substream being joined followed by the next n elements from the second substream being joined, and so on so forth. When one of the substreams being joined completes, the flow takes the next substream and continues its process with that substream added in the set of substreams it joins. The flow completes when the stream completes and all of its substreams have been processed. It fails when the stream fails or one of the substreams being processed fails. The number of substreams to join at the same time is provided by the user as an optional value called breadth. When that optional value is set None, there is no limit on the maximum number to process simultaneously.

MapJoinFairly

MapJoinFairly[T, U](n: BigInt, mapper: T => Source[U, NotUsed], breadth: Option[BigInt]): Flow[T, U, NotUsed]

Creates a flow using the mapper function to turn each element of a stream in a substream and then flattening all of these substreams via the joinFairly operator. The number of substreams to join at the same time is provided by the user as an optional value called breadth. When that optional value is set to None, there is no limit on the maximum number to process simultaneously.

JoinWithPriorities

JoinWithPriorities[T, P: Ordering](
  priorityOf: T => P, breadth: Option[BigInt]
): Flow[Source[T, NotUsed], T, NotUsed]

Creates a flattening operator. The result is a flow taking the first N substreams and joining them by emitting all of their elements. The function priorityOf is used to attribute a priority to each elements and when several are available for emission, the one with the highest priority is emitted. When one of the substreams being joined completes, the flow takes the next substream and continues its process with that substream added in the set of substreams it joins. The flow completes when the stream completes and all of its substreams have been processed. It fails when the stream fails or one of the substreams being processed fails. The number of substreams to join at the same time is provided by the user as an optional value called breadth. When that optional value is set None, there is no limit on the maximum number to process simultaneously.

MapJoinWithPriorities

MapJoinWithPriorities[T, U, P: Ordering](
  mapper: T => Source[U, NotUsed], priorityOf: U => P, breadth: Option[BigInt]
): Flow[T, U, NotUsed]

Creates a flow using the mapper function to turn each element of a stream in a substream and then flattening all of these substreams via the joinWithPriorities operator. The number of substreams to join at the same time is provided by the user as an optional value called breadth. When that optional value is set to None, there is no limit on the maximum number to process simultaneously.

Diverging operators

These operators enable to build streaming scenarios where a dynamic set of consumers are connected to the same producer. They provide sources whose materializations are consumers emitting the elements they receive from a shared producer they registers to. The job of the producer is to emit the elements of a wrapped source to the registered consumers. It manages a dynamic group of consumers that grows or shrinks as consumers register and unregister. In each of these operators, a special flag called restartSource allows to specify how the producer should react when there is no more consumers. When the flag is set to true, the producer will stop the wrapped source and restart it from the beginning when some new consumer register. When set to false, it will let the wrapped source continue to execute. In each divering operators, if an element sent to a consumer isn't ackowledged to the producer before a FiniteDuration called baseTimeoutDelay, which is provided by the user, the element is sent again to that consumer. This duration is increased exponentially by a power of two each time the same element is sent again to the same consumer. Note that this mecanism is completely transparent for the final user:

  • nothing more than providing that baseTimeoutDelay is expected to be done by the user;
  • when an element is received several time by the same consumer due to retransmissions, it will appear only once in the corresponding stream.

Anycast

Anycast[T](source: Source[T, NotUsed], restartSource: Boolean, baseTimeoutDelay: FiniteDuration)
          (implicit actorSystem: ActorSystem): Source[T, NotUsed]

Creates a diverging operator. The result is a source whose materializations are consumers registered to the same producer. The producer emits the elements of the source it wraps one after the others. Each element is sent to one of the consumers, taking the first one available or using a FIFO policy when several consumers are available. The source completes when the producer completes. It fails when the producer or any of its consumers fails.

AnycastWithPriorities

AnycastWithPriorities[T, P: Ordering](source: Source[T, NotUsed], restartSource: Boolean,
                                      baseTimeoutDelay: FiniteDuration)
                                     (implicit actorSystem: ActorSystem): AnycastWithPriorities[T, P]

Creates a diverging operator. The result is an object allowing to create sources whose materializations are consumers registered to the same producer. The producer emits the elements of the source it wraps one after the others. These sources can be created via the method:

AnycastWithPriorities.withPriority(priority: P): Source[T, NotUsed]

Each consumer is bound to the priority that is given when the source it materializes is created. Each element is sent to one of the consumers, taking the first one available or the one with the highest priority when several are available. The created sources complete when the producer completes. They fail when the producer or any of its consumers fail.

Broadcast

Broadcast[T](source: Source[T, NotUsed], restartSource: Boolean,
             waitSlowest: Boolean, bufferSize: Int, baseTimeoutDelay: FiniteDuration)
            (implicit actorSystem: ActorSystem): Source[T, NotUsed]

Creates a diverging operator. The result is a source whose materializations are consumers registered to the same producer. The producer emits the elements of the source it wraps one after the others. Each element is sent to all of its consumers. A buffer allows the fastest consumers to advance without having to wait for the slowest ones to consume the current element. The size of the buffer is finite and provided by the user with the bufferSize parameter. Two behaviors are available when the buffer is full: wait the slowest consumers when the user sets the flag waitSlowest to true or follow the pace of the fastest consumers (which means that some consumers may skip some elements) when the user sets this flag to false. The source completes when the producer completes. It fails when the producer or any of its consumers fails.

Balance

Balance[T](source: Source[T, NotUsed], n: BigInt, restartSource: Boolean,
           baseTimeoutDelay: FiniteDuration)
          (implicit actorSystem: ActorSystem): Source[T, NotUsed]

Creates a diverging operator. The result is a source whose materializations are consumers registered to the same producer. The producer emits the elements of the source it wraps one after the others. Each element is sent to one of its consumers. The n first elements are sent to the first consumer, then the n next elements are sent to the second consumer and so on so forth in a circular fashion. The constant n is a parameter giving the amount of elements to send to an individual consumer before switching to the next. The source completes when the producer completes. It fails when the producer or any of its consumers fails.

Partition

Partition[T, P](source: Source[T, NotUsed], partitionOf: T => P, restartSource: Boolean,
                waitSlowest: Boolean, bufferSize: Int, baseTimeoutDelay: FiniteDuration)
               (implicit actorSystem: ActorSystem): P => Source[T, NotUsed]

Creates a diverging operator. The result is a function allowing to create sources whose materializations are consumers registered to the same producer. The producer emits the elements of the source it wraps one after the others. Each consumer is bound to a partition P that is given when the source it materializes is created. Each element is attributed to a given partition using a partitioning function, partitionOf. Each consumer subscribes to the partition it is bound to and receives all the elements attributed to this partition. A buffer allows the fastest consumers to advance without having to wait for the slowest ones to consume the current element. This buffer is shared by all consumers no matter the partition they are subscribed to. The size of the buffer is finite and provided by the user with the bufferSize parameter. Two behaviors are available when the buffer is full: wait the slowest consumers when the user sets the flag waitSlowest to true or follow the pace of the fastest consumers (which means that the slowest consumers may skip some elements) when the user sets this flag to false. The sources completes when the producer completes. It fails when the producer or any of its consumers fails.

Versions

Version
0.0.0