elefantenstark
Elefantenstark is a PostgreSQL powered worker queue for Java 8. It uses Postgres advisory locks to lock work while being worked on.
The lock can be tweaked to solve requirements regarding the ordering of how work has to be processed. E.g. all work items sharing the same key must be processed one after another.
Initializing the database tables
The Initializer
creates the default queue table and indexes.
withPostgres(
connection -> {
new Initializer().build(connection);
validateTableIsAvailable(connection);
});
Producing work
Producer
is thread safe and can be used to produce a WorkItem
to the queue.
withPostgresAndSchema(
connection -> {
WorkItem workItem = WorkItem.groupedOnKey("_test_key_", "_test_value_", 0);
new Producer().produce(connection, workItem);
WorkItem queuedWorkItem = queryForWorkItem(connection);
assertEquals(workItem, queuedWorkItem);
});
Consuming work
A Consumer
can be sessionScoped
(tied to the current connection) or transactionScoped
(creates a transaction). The session scoped can modify state that is not rolled back when the consuming function fails, however can cause unreleased locks when the application is killed. These are cleaned up when the OS decides to close the network connection.
withPostgresAndSchema(
connection -> {
Producer producer = new Producer();
Consumer consumer = Consumer.sessionScoped();
producer.produce(connection, WorkItem.groupedOnKey("a", "b", 23));
Optional<String> next = consumer.next(
connection,
workItemContext -> {
capturedWork.set(workItemContext);
return "OK+";
});
});
Builds
Snapshot builds are available on Sonatype Snapshots, release builds are available on maven central.
<dependencies>
<dependency>
<groupId>net.andreaskluth</groupId>
<artifactId>elefantenstark</artifactId>
<version>0.1-RELEASE</version>
</dependency>
</dependencies>