com.opencredo:concursus

Sonatype helps open source projects to set up Maven repositories on https://oss.sonatype.org/

GroupId

GroupId

com.opencredo
ArtifactId

ArtifactId

concursus
Last Version

Last Version

0.6
Release Date

Release Date

Type

Type

pom
Description

Description

Sonatype helps open source projects to set up Maven repositories on https://oss.sonatype.org/
Source Code Management

Source Code Management

http://github.com/opencredo/concursus

Download concursus

Filename Size
concursus-0.6.pom 8 KB
Browse

How to add to project

<!-- https://jarcasting.com/artifacts/com.opencredo/concursus/ -->
<dependency>
    <groupId>com.opencredo</groupId>
    <artifactId>concursus</artifactId>
    <version>0.6</version>
    <type>pom</type>
</dependency>
// https://jarcasting.com/artifacts/com.opencredo/concursus/
implementation 'com.opencredo:concursus:0.6'
// https://jarcasting.com/artifacts/com.opencredo/concursus/
implementation ("com.opencredo:concursus:0.6")
'com.opencredo:concursus:pom:0.6'
<dependency org="com.opencredo" name="concursus" rev="0.6">
  <artifact name="concursus" type="pom" />
</dependency>
@Grapes(
@Grab(group='com.opencredo', module='concursus', version='0.6')
)
libraryDependencies += "com.opencredo" % "concursus" % "0.6"
[com.opencredo/concursus "0.6"]

Dependencies

There are no dependencies for this project. It is a standalone project that does not depend on any other jars.

Project Modules

There are no modules declared in this project.

Concursus

Maven Central Build Status

Concursus is a Java 8 framework for building applications that use CQRS and event sourcing patterns, with a Cassandra event log implementation.

See the wiki for further documentation, or browse the javadocs.

Getting Started

Create a project with dependencies on concursus-mapping, concursus-domain-json and jackson-datatype-jsr310:

<dependency>
    <groupId>com.opencredo</groupId>
    <artifactId>concursus-mapping</artifactId>
</dependency>

<dependency>
    <groupId>com.opencredo</groupId>
    <artifactId>concursus-domain-json</artifactId>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

The first thing we might want to do is generate some events. To begin with, we need an interface class that defines the events we want to create:

@HandlesEventsFor("person")
public interface Events {
    @Initial
    void created(StreamTimestamp ts, String personId, String name, LocalDate dateOfBirth);
    void changedName(StreamTimestamp ts, String personId, String newName);
    void movedToAddress(StreamTimestamp ts, String personId, String addressId);
    @Terminal
    void deleted(StreamTimestamp ts, String personId);
}

Each method in this interface defines an event which can occur to a person. We generate an event by calling one of these methods, which results in an event being sent to an EventOutChannel. Let's create a channel that simply prints the event to the console, and then create a proxy implementation of PersonEvents that sends events to this channel:

// Create an EventOutChannel that simply prints events to the command line
EventOutChannel outChannel = System.out::println;

// Create a proxy that sends events to the outChannel.
PersonEvents proxy = EventEmittingProxy.proxying(outChannel, PersonEvents.class);

// Send an event via the proxy.
proxy.created(StreamTimestamp.now(), "id1", "Arthur Putey", LocalDate.parse("1968-05-28"));

This will output a String like the following:

person:id1 created_0
at 2016-03-31T10:31:17.981Z/
with person/created_0{dateOfBirth=1968-05-28, name=Arthur Putey}

This means that an event of type created_0 occurred to the object person:b2fb2f38-0473-4359-b62b-fad149caf2d5 at 2016-03-31T10:31:17.981Z, and this event had two parameters associated with it, name and dateOfBirth.

We can have the event encoded as JSON if we use an EventOutChannel that performs the encoding:

// Create an EventOutChannel that formats events as JSON and sends them to a command line printer.
EventInChannel<String> print = System.out::println;
ObjectMapper objectMapper = new ObjectMapper()
        .findAndRegisterModules()
        .configure(SerializationFeature.INDENT_OUTPUT, true);
EventOutChannel outChannel = JsonEventOutChannel.using(objectMapper, print);

// Create a proxy that sends events to the outChannel.
PersonEvents proxy = EventEmittingProxy.proxying(outChannel, PersonEvents.class);

// Send an event via the proxy.
proxy.created(StreamTimestamp.now(), "id1, "Arthur Putey", LocalDate.parse("1968-05-28"));

This will output JSON like the following:

{
  "aggregateType" : "person",
  "aggregateId" : "id1",
  "name" : "created",
  "version" : "0",
  "eventTimestamp" : 1459420919667,
  "streamId" : "",
  "processingId" : "",
  "characteristics" : 1,
  "parameters" : {
    "dateOfBirth" : [ 1968, 5, 28 ],
    "name" : "Arthur Putey"
  }
}

Instead of simply printing things to the console, let's start storing events. We can use an InMemoryEventStore to begin with:

// Create an InMemoryEventStore, and a proxy that sends events to it.
InMemoryEventStore eventStore = InMemoryEventStore.empty();
PersonEvents proxy = EventEmittingProxy.proxying(eventStore.toEventOutChannel(), PersonEvents.class);

// Send an event via the proxy.
final String personId = String.randomString();
proxy.created(StreamTimestamp.now(), personId, "Arthur Putey", LocalDate.parse("1968-05-28"));

// Create an EventTypeMatcher based on the Events interface, and use it to map events back out of the store
EventTypeMatcher typeMatcher = EmitterInterfaceInfo.forInterface(PersonEvents.class).getEventTypeMatcher();
// Retrieve the stored events for the aggregate with id=person/personId, and print them to the console.
EventSource.retrievingWith(eventStore)
        .getEvents(typeMatcher, AggregateId.of("person", personId))
        .forEach(System.out::println);

Once we have stored events, we can replay them to event handlers, mapping them back into method calls on the PersonEvents interface:

// Create a mock handler for person events.
PersonEvents handler = mock(PersonEvents.class);

// Create an InMemoryEventStore, and a proxy that sends events to it.
InMemoryEventStore eventStore = InMemoryEventStore.empty();
PersonEvents proxy = EventEmittingProxy.proxying(eventStore.toEventOutChannel(), PersonEvents.class);

// Send an event via the proxy.
String personId = "id1;
proxy.created(StreamTimestamp.now(), personId, "Arthur Putey", LocalDate.parse("1968-05-28"));

// Replay the stored events for the person with id=person/personId to the handler instance.
DispatchingEventSource.dispatching(EventSource.retrievingWith(eventStore), PersonEvents.class)
        .replaying(personId)
        .replayAll(handler);

// Verify that the handler received the event.
verify(handler).created(any(StreamTimestamp.class), any(String.class), eq("Arthur Putey"), eq(LocalDate.parse("1968-05-28")));

Check out the Examples for more detailed examples, including command processing and state-building.

Using Cassandra and Redis

Eventually you will want to store events more permanently. Cassandra and Redis event store implementations are provided in concursus-cassandra and concursus-redis respectively. You will need to create a suitable keyspace and tables in Cassandra before you can use it:

CREATE KEYSPACE IF NOT EXISTS concursus
  WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };

CREATE TABLE IF NOT EXISTS concursus.Event (
   aggregateType text,
   aggregateId text,
   eventTimestamp timestamp,
   streamId text,
   processingId timeuuid,
   name text,
   version text,
   parameters map<text, text>,
   characteristics int,
   PRIMARY KEY((aggregateType, aggregateId), eventTimestamp, streamId)
) WITH CLUSTERING ORDER BY (eventTimestamp DESC);

CREATE TABLE IF NOT EXISTS concursus.Catalogue (
    aggregateType text,
    bucket int,
    aggregateId text,
    PRIMARY KEY ((aggregateType, bucket), aggregateId)
) WITH CLUSTERING ORDER BY (aggregateId DESC);
com.opencredo

OpenCredo

Versions

Version
0.6
0.5
0.4
0.3
0.2