Flusswerk - Spring Boot Starter

Tooling for AMQP/RabbitMQ based workflow management.

License

License

Categories

Categories

Spring Boot Container Microservices
GroupId

GroupId

de.digitalcollections.flusswerk
ArtifactId

ArtifactId

spring-boot-starter-flusswerk
Last Version

Last Version

3.2.0
Release Date

Release Date

Type

Type

jar
Description

Description

Flusswerk - Spring Boot Starter
Tooling for AMQP/RabbitMQ based workflow management.

Download spring-boot-starter-flusswerk

How to add to project

<!-- https://jarcasting.com/artifacts/de.digitalcollections.flusswerk/spring-boot-starter-flusswerk/ -->
<dependency>
    <groupId>de.digitalcollections.flusswerk</groupId>
    <artifactId>spring-boot-starter-flusswerk</artifactId>
    <version>3.2.0</version>
</dependency>
// https://jarcasting.com/artifacts/de.digitalcollections.flusswerk/spring-boot-starter-flusswerk/
implementation 'de.digitalcollections.flusswerk:spring-boot-starter-flusswerk:3.2.0'
// https://jarcasting.com/artifacts/de.digitalcollections.flusswerk/spring-boot-starter-flusswerk/
implementation ("de.digitalcollections.flusswerk:spring-boot-starter-flusswerk:3.2.0")
'de.digitalcollections.flusswerk:spring-boot-starter-flusswerk:jar:3.2.0'
<dependency org="de.digitalcollections.flusswerk" name="spring-boot-starter-flusswerk" rev="3.2.0">
  <artifact name="spring-boot-starter-flusswerk" type="jar" />
</dependency>
@Grapes(
@Grab(group='de.digitalcollections.flusswerk', module='spring-boot-starter-flusswerk', version='3.2.0')
)
libraryDependencies += "de.digitalcollections.flusswerk" % "spring-boot-starter-flusswerk" % "3.2.0"
[de.digitalcollections.flusswerk/spring-boot-starter-flusswerk "3.2.0"]

Dependencies

compile (6)

Group / Artifact Type Version
de.digitalcollections.flusswerk : dc-flusswerk-engine jar 3.2.0
javax.annotation : javax.annotation-api jar 1.3.2
javax.validation : validation-api jar 2.0.1.Final
org.springframework.boot : spring-boot-configuration-processor jar 2.2.6.RELEASE
org.springframework.boot : spring-boot-starter jar 2.2.6.RELEASE
io.micrometer : micrometer-core jar 1.4.1

test (4)

Group / Artifact Type Version
org.assertj : assertj-core jar 3.15.0
org.junit.jupiter : junit-jupiter-api jar 5.6.2
org.junit.jupiter : junit-jupiter-engine jar 5.6.2
org.springframework.boot : spring-boot-starter-test jar 2.2.6.RELEASE

Project Modules

There are no modules declared in this project.

Flusswerk - Digital Collections Workflow Engine

Javadocs License Maven Central

Flusswerk makes it easy to create multi threaded workers for read/transform/write chains (aka ETL jobs). Workflows are coordinated via RabbitMQ, so it's easy to create chains of independent workflow jobs (each a new Java application).

Maven:

<dependency>
  <groupId>com.github.dbmdz.flusswerk</groupId>
  <artifactId>framework</artifactId>
  <version>4.1.0</version>
</dependency>

Gradle:

dependencies {
    compile group: 'com.github.dbmdz.flusswerk', name: 'flusswerk', version: '4.1.0'
}

Getting started

To get started, clone or copy the Flusswerk Example application.

Migration to version 4

Starting with Flusswerk 4, there are two major changes:

  • Any Flusswerk application uses now Spring Boot and needs beans for FlowSpec (defining the processing) and IncomingMessageType.
  • The package names changed from de.digitalcollections.flusswerk.engine to com.github.dbmdz.framework.

The Big Picture

A typical Flusswerk application has three parts:

  • Messages
  • the data processing flow (usually a Reader, a Transformer and a Writer)
  • Some Spring Boot glue code
  • Spring Boot application.yml to configure all the Flusswerk things

Usually it is also useful to define your own data model classes, although that is not strictly required.

Other optional parts are

  • Custom metrics collection
  • Custom logging formats (aka ProcessReport)
  • Centralized locking

Messages

Message classes are for a sending and receiving data from RabbitMQ. All Message classes extend Message, which automatically forwards tracing ids from incoming to outgoing messages (if you set the tracing id by hand, it will not be overwritten).

class IndexMessage implements Message {

  private String documentId;

  public IndexMessage(String documentId) {
    this.documentId = requireNonNull(documentId);
  }

  public String getId() { ... }

  public boolean equals(Object other) { ... }

  public int hashCode() { ... }

}

Register the type for the incoming message, so it gets automatically deserialized:

@Bean
public IncomingMessageType incomingMessageType() {
  return new IncomingMessageType(IndexMessage.class);
}

Configuration

All configuration magic happens in Spring's application.yml.

A minimal configuration might look like:

# Default spring profile is local
spring:
  application:
    name: flusswerk-example

flusswerk:
  routing:
    incoming:
      - search.index
    outgoing:
      default: search.publish

This defaults to connecting to RabbitMQ localhost:5672, with user and password guest, five threads and retrying a message five times. The only outgoing route defined is default, which is used by Flusswerk to automatically send messages. For most applications these are sensible defaults and works out of the box for local testing.

The connection information can be overwritten for different environments using Spring Boot profiles:

---
spring:
  profiles: production
flusswerk:
  rabbitmq:
    hosts:
      - rabbitmq.stg
    username: secret
    password: secret

The sections of the Flusswerk configuration

processing - control of processing

property default
threads 5 Number of threads to use for parallel processing

rabbitmq - Connection to RabbitMQ:

property default
hosts localhost list of hosts to connect to
username guest RabbitMQ username
passwords guest RabbitMQ password

routing - Messages in and out

property default
incoming list of queues to read from in order
outgoing routes to send messages to (format 'name: topic')
exchange flusswerk_default default exchange for all queues
dead letter exchange <exchange> + ".retry" default dead letter exchange for all queues
exchanges - queue: exchange name to override default exchanges
dead letter exchanges <exchange> + ".retry" queue: exchange name to override default dead letter exchanges
failure policies default how to handle messages with processing errors

routing.failure policies - how to handle messages with processing errors

property default
retries 5 how many times to retry
retryRoutingKey where to send messages to retry later (dead lettering)
failedRoutingKey where to send messages to that should not be processed again
backoff how long to wait until retrying a message

monitoring - Prometheus settings

property default
prefix flusswerk prefix for prometheus metrics

redis - Redis settings

property default
address redis://localhost:6379 Redis connection string
password Redis password (optional)
lockWaitTimeout 5s how long to wait for a lock
keyspace flusswerk prefix of the keys in Redis (separated by ::)

Data Processing

To set up your data processing flow, define a Spring bean of type FlowSpec:

@Bean
public FlowSpec flowSpec(Reader reader, Transformer transformer, Writer writer) {
  return FlowBuilder.flow(IndexMessage.class, Document.class, IndexDocument.class)
      .reader(reader)
      .transformer(transformer)
      .writerSendingMessage(writer)
      .build();
}

With the Reader, Transformer and Writer implementing the Function interface:

Reader Function<IndexMessage, Document> loads document from storage
Transformer Function<Document, IndexDocument> uses Document to build up the data structure needed for indexing
Writer Function<IndexDocument, Message> sends indexes the data and returns a message for the next workflow step

Best Practices

Stateless Processing

All classes that do data processing (Reader, Transformer, Writer,...) should be stateless. This has two reasons:

First, it makes your code thread-safe and multiprocessing easy without you having to even think about it. Just keep it stateless and fly!

Second, it makes testing a breeze: You throw in data and check the data that comes out. Things can go wrong? Just check if your code throws the right exceptions. Wherever you need to interact with external services, mock the behaviour, and your good to go (the Flusswerk tests make heavy use of Mockito, btw.).

If you absolutely have to introduce state, make sure your code is thread-safe.

Immutable Data

Wherever sensible, make your data classes immutable - set everything via the constructor and avoid setters. Implement equals() and hashCode(). This leads usually to more readable code, and makes writing tests much easier. This applies to Message classes and to the classes that contain data.

Your particular data processing needs to build your data over time and can't be immutable? Think again if that is the best way, but don't worry too much.

Manual Interaction with RabbitMQ

For manual interaction with RabbitMQ there is a Spring component with the same class:

RabbitMQ
ack(Message) acknowledges a Message received from a Queue
queue(String) returns the Queue instance to interact with a queue of the given name
topic(Message) returns the Topic instance for the given name to send messages to
route(Message) returns the Topic instance for the given route from application.yml

Error Handling

Any data processing can go wrong. Flusswerk supports two error handling modes:

  1. stop processing for a message completely. This behaviour is triggered by a StopProcessingException.
  2. retry processing for a message later. This behaviour is triggered by a RetryProcessingException or any other RuntimeException.

The default retry behaviour is to wait 30 seconds between retries and try up to 5 times. If processing a message still keeps failing, it is then treated like as if a StopProcessingException had been thrown and will be routed to a failed queue.

For more fine-grained control, see the configuration parameters for flusswerk.routing.failure policies.

Collecting Metrics

Every Flusswerk application provides base metrics via as a Prometheus endpoint:

flusswerk.processed.items total number of processed items since application start
flusswerk.execution.time total amount of time spend on processing these items

To include custom metrics, get counters via MeterFactory. A bean of type FlowMetrics can also consume execution information of single flows (best to extend BaseMetrics for that).

The prometheus endpoint is available at /actuator/prometheus.

Customize Logging

To customize log messages, provide a bean of type ProcessReport.

Centralized Locking

How to use

Flusswerk supports centralized locking of objects across different threads, Flusswerk apps and even services unrelated to Flusswerk all together. To use this feature, configure a Redis connection in application.yml and inject LockManager:

@Component
class Transformer implements Fuction<String, String> {

  private LockManager lockManager;
  
  @Autowired
  public Transformer(LockManager lockManager) {
    this.lockManager = requireNonNull(lockManager);
  }

  public String apply(String id) {
    lockManager.acquire(id);
    // process data

    // releasing the lock manually (as early as possible)
    lockManager.release();
    // otherwise, Flusswerk will release the lock after the Writer/Cleanup step
  }

}

Flusswerk always binds locks to the containing thread and automatically releases acquired locks after the cleanup step (after sending messages from the writer step).

A note on testing

Locking makes testing usually harder and more tedious. Flusswerk provides a NoOpLockManager that literally does nothing. In your tests, you can either provide mocks for LockManager, or simply use the NoOpLockManager to ignore locking while testing for other functionality.

de.digitalcollections.flusswerk

Open Source at the Bayerische Staatsbibliothek

...from the MDZ Digital Library team at the Bavarian State Library

Versions

Version
3.2.0
3.1.1
3.1.0