Java Dynamic SQS Listener - Spring Starter - Spring Cloud Schema Registry Example - Producer Two

Includes a second example of a service producing messages whose schema is registered in the Spring Cloud Schema Registry and is in a different format to the first.

License

License

GroupId

GroupId

com.jashmore
ArtifactId

ArtifactId

spring-cloud-schema-registry-producer-two
Last Version

Last Version

3.1.1
Release Date

Release Date

Type

Type

jar
Description

Description

Java Dynamic SQS Listener - Spring Starter - Spring Cloud Schema Registry Example - Producer Two
Includes a second example of a service producing messages whose schema is registered in the Spring Cloud Schema Registry and is in a different format to the first.

Download spring-cloud-schema-registry-producer-two

How to add to project

<!-- https://jarcasting.com/artifacts/com.jashmore/spring-cloud-schema-registry-producer-two/ -->
<dependency>
    <groupId>com.jashmore</groupId>
    <artifactId>spring-cloud-schema-registry-producer-two</artifactId>
    <version>3.1.1</version>
</dependency>
// https://jarcasting.com/artifacts/com.jashmore/spring-cloud-schema-registry-producer-two/
implementation 'com.jashmore:spring-cloud-schema-registry-producer-two:3.1.1'
// https://jarcasting.com/artifacts/com.jashmore/spring-cloud-schema-registry-producer-two/
implementation ("com.jashmore:spring-cloud-schema-registry-producer-two:3.1.1")
'com.jashmore:spring-cloud-schema-registry-producer-two:jar:3.1.1'
<dependency org="com.jashmore" name="spring-cloud-schema-registry-producer-two" rev="3.1.1">
  <artifact name="spring-cloud-schema-registry-producer-two" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.jashmore', module='spring-cloud-schema-registry-producer-two', version='3.1.1')
)
libraryDependencies += "com.jashmore" % "spring-cloud-schema-registry-producer-two" % "3.1.1"
[com.jashmore/spring-cloud-schema-registry-producer-two "3.1.1"]

Dependencies

compile (6)

Group / Artifact Type Version
org.springframework.boot : spring-boot-starter jar
com.google.guava : guava jar 29.0-jre
com.jashmore : avro-spring-cloud-schema-registry-sqs-client jar 3.1.1
software.amazon.awssdk : sqs jar
ch.qos.logback : logback-core jar 1.2.3
ch.qos.logback : logback-classic jar 1.2.3

provided (3)

Group / Artifact Type Version
org.projectlombok : lombok jar 1.18.12
net.jcip : jcip-annotations jar 1.0
com.github.spotbugs : spotbugs-annotations jar 4.0.2

test (5)

Group / Artifact Type Version
org.junit.jupiter : junit-jupiter jar 5.6.2
org.mockito : mockito-junit-jupiter jar 3.3.3
org.mockito : mockito-core jar 3.3.3
org.hamcrest : hamcrest jar 2.2
org.assertj : assertj-core jar 3.12.2

Project Modules

There are no modules declared in this project.

Java Dynamic SQS Listener

Build and Test Coverage Status Maven Central

The Java Dynamic SQS Listener is a library that simplifies the listening of messages on an AWS SQS queue. It has been built from the ground up with the goal of making it framework agnostic, easily customisable and allow for dynamic changes to the configuration during runtime.

Getting Started

The following provides some examples using the library with different languages or frameworks.

Spring Boot Quick Guide

  1. Include the dependency:

    <dependency>
        <groupId>com.jashmore</groupId>
        <artifactId>java-dynamic-sqs-listener-spring-starter</artifactId>
        <version>${sqs.listener.version}</version>
    </dependency>

    or

    dependencies {
        implementation("com.jashmore:java-dynamic-sqs-listener-spring-starter:${sqs.listener.version}")
    }
  2. In one of your beans, attach a @QueueListener annotation to a method indicating that it should process messages from a queue.

    @Service
    public class MyMessageListener {
    
        // The queue here can point to your SQS server, e.g. a
        // local SQS server or one on AWS
        @QueueListener("${insert.queue.url.here}")
        public void processMessage(@Payload final String payload) {
            // process the message payload here
        }
    }
    

    This will use any configured SqsAsyncClient in the application context for connecting to the queue, otherwise a default will be provided that will look for AWS credentials/region from multiple areas, like the environment variables.

See Spring Starter Minimal Example for a minimal example of configuring in a Spring Boot application with a local ElasticMQ SQS Server.

Java Core Quick Guide

  1. Include the dependency:

    <dependency>
        <groupId>com.jashmore</groupId>
        <artifactId>java-dynamic-sqs-listener-core</artifactId>
        <version>${sqs.listener.version}</version>
    </dependency>

    or

    dependencies {
        implementation("com.jashmore:java-dynamic-sqs-listener-core:${sqs.listener.version}")
    }
  2. Create a MessageListenerContainer for a specific queue.

    public class MyClass {
    
        public static void main(String[] args) throws InterruptedException {
            final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); // or your own custom client
            final QueueProperties queueProperties = QueueProperties.builder().queueUrl("${insert.queue.url.here}").build();
            final MessageListenerContainer container = new BatchingMessageListenerContainer(
                "listener-identifier",
                queueProperties,
                sqsAsyncClient,
                () ->
                    new LambdaMessageProcessor(
                        sqsAsyncClient,
                        queueProperties,
                        message -> {
                            // process message here
    
                        }
                    ),
                ImmutableBatchingMessageListenerContainerProperties
                    .builder()
                    .concurrencyLevel(10)
                    .batchSize(5)
                    .batchingPeriod(Duration.ofSeconds(20))
                    .build()
            );
            container.start();
            Runtime.getRuntime().addShutdownHook(new Thread(container::stop));
            Thread.currentThread().join();
        }
    }
    

See the Core Example for a more complicated example that uses a local ElasticMQ server and dynamically changes the concurrency of message processing while the app is running.

Kotlin Quick Guide

  1. Include the dependency:

    <dependencies>
        <dependency>
            <groupId>com.jashmore</groupId>
            <artifactId>java-dynamic-sqs-listener-core</artifactId>
            <version>${sqs.listener.version}</version>
        </dependency>
        <dependency>
            <groupId>com.jashmore</groupId>
            <artifactId>core-kotlin-dsl</artifactId>
            <version>${sqs.listener.version}</version>
        </dependency>
    </dependencies>

    or

    dependencies {
        implementation("com.jashmore:java-dynamic-sqs-listener-core:${sqs.listener.version}")
        implementation("com.jashmore:core-kotlin-dsl:${sqs.listener.version}")
    }
  2. Create a MessageListenerContainer for a specific queue.

    fun main() {
        val sqsAsyncClient = SqsAsyncClient.create()
    
        val container = batchingMessageListener("identifier", sqsAsyncClient, "url") {
            concurrencyLevel = { 10 }
            batchSize = { 5 }
            batchingPeriod =  { Duration.ofSeconds(20) }
    
            processor = lambdaProcessor {
                method { message ->
                    log.info("Message: {}", message.body())
                }
            }
        }
    
        container.start()
        Runtime.getRuntime().addShutdownHook(
            Thread {
                container.stop()
            }
        )
        Thread.currentThread().join()
    }

See the Core Kotlin Example for a full example running a Kotlin App that listens to a local ElasticMQ SQS Server.

Ktor Quick Guide

  1. Include the dependency:

    <dependency>
        <groupId>com.jashmore</groupId>
        <artifactId>java-dynamic-sqs-listener-ktor-core</artifactId>
        <version>${sqs.listener.version}</version>
    </dependency>

    or

    dependencies {
        implementation("com.jashmore:java-dynamic-sqs-listener-ktor-core:${sqs.listener.version}")
    }
  2. Add a message listener to the server

    fun main() {
        val sqsAsyncClient = SqsAsyncClient.create() // replace with however you want to configure this client
        val queueUrl = "replaceWithQueueUrl"
        val server = embeddedServer(Netty, 8080) {
             batchingMessageListener("listener-identifier", sqsAsyncClient, queueUrl) {
                  concurrencyLevel = { 10 }
                  batchSize = { 5 }
                  batchingPeriod =  { Duration.ofSeconds(20) }
    
                  processor = lambdaProcessor {
                      method { message ->
                          log.info("Message: {}", message.body())
                      }
                  }
             }
        }
        server.start()
        Runtime.getRuntime().addShutdownHook(
            Thread {
                server.stop(1, 30_000)
            }
        )
        Thread.currentThread().join()
    }

See the Ktor Core Example for a full example running a Ktor framework that listens to a local ElasticMQ SQS Server.

Core Infrastructure

This library has been divided into isolated components each with distinct responsibilities. The following is a diagram describing a simple flow of a single SQS message flowing through each of the components to eventually be executed by some code.

Core Framework Architecture Diagram

Details about each component is:

  • The MessageRetriever handles obtaining messages from the SQS queue. This can optimise the retrieval of messages by batching requests for messages or prefetching messages before they are needed.
  • The MessageProcessor controls the processing of a message from the queue by delegating it to the corresponding Java method that handles the message.
  • The ArgumentResolverService is used by the MessageProcessor to populate the arguments of the message listener method. For example, a parameter with the @Payload annotation will be resolved with the body of the message cast to that type (e.g. a POJO).
  • The MessageBroker is the main container that controls the whole flow of messages from the MessageRetriever to the MessageProcessor. It can determine when more messages are to be processed and the rate of concurrency for processing messages.
  • The MessageResolver is used after successful processing of the message and its responsibility is to remove the message from the SQS queue, so it is not processed again if there is a re-drive policy.

For more information about the core implementations provided by this library, see the Core Implementations Overview.

Dependencies

The framework relies on the following dependencies and therefore it is recommended to upgrade the applications dependencies to a point somewhere near these for compatibility.

See the gradle.properties for the specific versions of these dependencies.

How to Guides

More in-depth guides on how configure this library:

  1. How to Connect to an AWS SQS Queue: necessary for actually using this framework in live environments
  2. Core Framework How To Guides
    1. How to implement a custom ArgumentResolver: useful for changing resolution of arguments in the message listener
    2. How to manually acknowledge message: useful for when you want to mark the message as successfully processed before the method has finished executing
    3. How to add Brave Tracing: for including Brave Tracing information to your messages
    4. How to implement a custom MessageRetriever: useful for changing the logic for obtaining messages from the SQS queue if the core implementations do not provided the required functionality
    5. How to extend a message's visibility during processing: useful for extending the visibility of a message in the case of long processing so it does not get put back on the queue while processing
    6. How to create a MessageProcessingDecorator: guide for writing your own decorator to wrap a message listener's processing of a message
    7. How to use the Core Kotlin DSL: guide for using the core library easier using a Kotlin DSL for constructing message listeners
  3. Spring How To Guides
    1. How to add a custom ArgumentResolver to a Spring application: useful for integrating custom argument resolution code to be included in a Spring Application. See How to implement a custom ArgumentResolver for how build a new ArgumentResolver from scratch.
    2. How to add Brave Tracing: for including Brave Tracing information to your messages
    3. How to add custom MessageProcessingDecorators: guide on how to autowire custom MessageProcessingDecorators into your Spring Queue Listeners.FifoMessageListenerContainerDslBuilde
    4. How to Extend Message Visibility: manually or automatically extend a message's visibility during processing.
    5. How to have dynamic properties: guide for overriding the listener annotations to use custom logic.
    6. How to customise argument resolution: guide for overriding the entire argument resolution logic
    7. How to add your own queue listener: useful for defining your own annotation for the queue listening without the verbosity of a custom queue listener
    8. How to write Spring Integration Tests: you actually want to test what you are writing right?
    9. How to prevent listeners from starting on startup: guide for preventing containers from starting when the Spring application has started up.
    10. How to Start/Stop Queue Listeners: guide for starting and stopping the processing of messages for specific queue listeners
    11. How to connect to multiple AWS Accounts: guide for listening to queues across multiple AWS Accounts
    12. How to version message payload schemas: guide for versioning payloads using Avro and the Spring Cloud Schema Registry.
  4. Ktor How to Guides
    1. How to Register Message Listeners: guide for include message listeners into a Ktor application.

Common Use Cases/Explanations

Core Processor - How to de-serialise a JSON Payload

When you are using the reflection based CoreMessageProcessor (which is the default for Spring Boot applications), the payload of the message is de-serialised by default using Jackson and therefore any Jackson compatible POJO class can be used with the @Payload annotation.

@Service
public class MyMessageListener {

    @QueueListener(value = "${insert.queue.url.here}")
    public void processMessage(@Payload final MyPojo payload) {
        // process the message payload here
    }

    public static class MyPojo {

        private String name;

        public MyPojo() {
            this.name = null;
        }

        public MyPojo(String name) {
            this.name = null;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }
}

Spring - Adding a custom argument resolver

There are some core ArgumentResolvers provided in the application but custom ones can be defined if they don't cover your use case. As an example, the following is how we can populate the message listener argument with the payload in uppercase.

  1. We will use an annotation on the field to indicate how the message should be resolved.

    @Retention(value = RUNTIME)
    @Target(ElementType.PARAMETER)
    public @interface UppercasePayload {
    }
    
  2. Implement the ArgumentResolver interface where it will do the logic for converting the message payload to uppercase.

    public class UppercasePayloadArgumentResolver implements ArgumentResolver<String> {
    
        @Override
        public boolean canResolveParameter(MethodParameter methodParameter) {
            return (
                methodParameter.getParameter().getType().isAssignableFrom(String.class) &&
                AnnotationUtils.findParameterAnnotation(methodParameter, UppercasePayload.class).isPresent()
            );
        }
    
        @Override
        public String resolveArgumentForParameter(QueueProperties queueProperties, Parameter parameter, Message message)
            throws ArgumentResolutionException {
            return message.body().toUppercase();
        }
    }
    

    You may be curious why we use a custom AnnotationUtils.findParameterAnnotation function instead of getting the annotation directly from the parameter. The reason for this is due to potential proxying of beans in the application, such as by applying Aspects around your code via CGLIB. As libraries, like CGLIB, won't copy the annotations to the proxied classes the resolver needs to look through the class hierarchy to find the original class to get the annotations. For more information about this, take a look at the JavaDoc provided in AnnotationUtils. You can also see an example of testing this problem in PayloadArgumentResolver_ProxyClassTest.java.

    Also, as this library is not Spring specific, the Spring Annotation classes cannot be used.

  3. Include the custom ArgumentResolver in the application context for automatic injection into the ArgumentResolverService.

    @Configuration
    public class MyCustomConfiguration {
    
        @Bean
        public UppercasePayloadArgumentResolver uppercasePayloadArgumentResolver() {
            return new UppercasePayloadArgumentResolver();
        }
    }
    
  4. Use the new annotation in your message listener

    @Component
    public class MyService {
    
        @QueueListener("${insert.queue.url.here}") // The queue here can point to your SQS server, e.g. a local SQS server or one on AWS
        public void processMessage(@UppercasePayload final String uppercasePayload) {
            // process the message payload here
        }
    }
    

For a more extensive guide for doing this, take a look at Spring - How to add a custom Argument Resolver. If you are using the core or Ktor library, you can look at the Core - How to Implement a Custom Argument Resolver for a guide on creating a new argument resolver.

Increasing the concurrency limit

There is no limit to the number of messages that can be processed in the application and therefore you can process as many messages to the limit of the threads that the application can handle. Therefore, if you are fine spinning up as many threads as concurrent messages, you can increase the concurrency to as high of a value as you wish.

Core

public class SomeClass {

    public MessageListenerContainer container() {
        return new BatchingMessageListenerContainer(
            // other configuration
            ImmutableBatchingMessageListenerContainerProperties
                .builder()
                .concurrencyLevel(100)
                .batchSize(5)
                .batchingPeriod(Duration.ofSeconds(20))
                .build()
        );
    }
}

Spring Boot

@Service
public class MyMessageListener {

    @QueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 100)
    public void processMessage(@Payload final String payload) {
        // process message here
    }
}

Kotlin DSL/Ktor

coreMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
    broker = concurrentBroker {
        concurrencyLevel = { 100 }
    }
    // other configuration
}

or

batchingMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
    concurrencyLevel = { 100 }
    // other configuration
}

How to Mark the message as successfully processed

When the method executing the message finishes without throwing an exception, the MessageProcessor will acknowledge the message as a success, therefore removing it from the queue. When the method throws an exception, the message will not be acknowledged and if there is a re-drive policy the queue will perform another attempt of processing the message.

public class MyMessageListener {

    @QueueListener(value = "queue-name")
    public void processMessage(@Payload final String payload) {
        // if this does not throw an exception it will be considered successfully processed
    }
}

or

lambdaProcessor {
    method { message ->
        // if this does not throw an exception it will be considered successfully processed
    }
}

If the method contains an Acknowledge argument it is now up to the method to manually acknowledge the message as a success. The MessageProcessor has handed off control to the message listener and will not acknowledge the message automatically when the method executes without throwing an exception.

public class MyMessageListener {

    @QueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10, maxPeriodBetweenBatchesInMs = 2000)
    public void processMessage(@Payload final String payload, final Acknowledge acknowledge) {
        if (someCondition()) {
            CompletableFuture<?> future = acknowledge.acknowledgeSuccessful();
            future.get();
        }
    }
}

or

lambdaProcessor {
    method { message, acknowledge ->
        if (someCondition()) {
            acknowledge.acknowledgeSuccessful().get()
        }
    }
}

Setting up a queue listener that batches requests for messages

The Spring Cloud AWS Messaging @SqsListener works by requesting a set of messages from the SQS and when they are done it will request some more. There is one disadvantage with this approach in that if 9/10 of the messages finish in 10 milliseconds but one takes 10 seconds no other messages will be picked up until that last message is complete. The @QueueListener provides the same basic functionality, but it also provides a timeout where it will eventually request for more messages when there are threads that are ready for another message.

Core/Spring Boot

public class MyMessageListener {

    @QueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10, maxPeriodBetweenBatchesInMs = 2000)
    public void processMessage(@Payload final String payload) {
        // process the message payload here
    }
}

Kotlin/Ktor

batchingMessageListener("listener-identifier", sqsAsyncClient, "${insert.queue.url.here}") {
    concurrencyLevel = { 10 }
    batchSize = { 10 }
    batchingPeriod = { Duration.ofSeconds(2) }
    processor = lambdaProcessor {
        method { message ->
          // process the message payload here
        }
    }
}

In this example above we have set it to process 10 messages at once and when there are threads wanting more messages it will wait for a maximum of 2 seconds before requesting messages for threads waiting for another message.

Setting up a queue listener that prefetches messages

When the amount of messages for a service is extremely high, prefetching messages may be a way to optimise the throughput of the application. In this example, if the amount of prefetched messages is below the desired amount of prefetched messages it will try to get as many messages as possible up to the maximum specified.

Note: because of the limit of the number of messages that can be obtained from SQS at once (10), having the maxPrefetchedMessages more than 10 above the desiredMinPrefetchedMessages will not provide much value as once it has prefetched more than the desired prefetched messages it will not prefetch anymore.

Spring Boot

The @PrefetchingQueueListener annotation can be used to prefetch messages in a background thread while processing the existing messages. The usage is something like this:

@Service
public class MyMessageListener {

    @PrefetchingQueueListener(
        value = "${insert.queue.url.here}",
        concurrencyLevel = 10,
        desiredMinPrefetchedMessages = 5,
        maxPrefetchedMessages = 10
    )
    public void processMessage(@Payload final String payload) {
        // process the message payload here
    }
}

Kotlin DSL/Ktor

prefetchingMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
    concurrencyLevel = { 10 }
    desiredPrefetchedMessages = 5
    maxPrefetchedMessages = 10

    processor = lambdaProcessor {
        methodWithVisibilityExtender { message, _ ->
            // process the message payload here
        }
    }
}

Listening to a FIFO SQS Queue

FIFO SQS Queues can be used when the order of the SQS messages are important. The FIFO message listener guarantees messages in the same message group run in the order they are generated and two messages in the same group executed concurrently. For more information about the configuration options for the message listener take a look at the FifoMessageListenerContainerProperties or the specific annotation or DSL builder for the framework implementation.

Java

public class Main {

    public static void main(String[] args) throws InterruptedException {
        final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); // or your own custom client
        final QueueProperties queueProperties = QueueProperties.builder().queueUrl("${insert.queue.url.here}").build();
        final MessageListenerContainer container = new FifoMessageListenerContainer(
            queueProperties,
            sqsAsyncClient,
            () ->
                new LambdaMessageProcessor(
                    sqsAsyncClient,
                    queueProperties,
                    message -> {
                        // process the message here
                    }
                ),
            ImmutableFifoMessageListenerContainerProperties.builder().identifier("listener-identifier").concurrencyLevel(10).build()
        );
        container.start();
        Runtime.getRuntime().addShutdownHook(new Thread(container::stop));
        Thread.currentThread().join();
    }
}

Spring Boot

@Component
class MessageListeners {

    @FifoQueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10)
    public void fifoListener(@Payload final String body) {
        // process message here
    }
}

Kotlin DSL/Ktor

fifoMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
    concurrencyLevel = { 10 }

    processor = lambdaProcessor {
        method { message ->
            // process the message payload here
        }
    }
}

Comparing other SQS Libraries

If you want to see the difference in usage between this library and others like the Spring Cloud AWS Messaging and Amazon SQS Java Messaging Library, take a look at the sqs-listener-library-comparison module. This allows you to test the performance and usage of each library for different scenarios, such as heavy IO message processing, etc.

Examples

See examples for all the available examples.

Testing locally an example Spring Boot app with the Spring Starter

The easiest way to see the framework working is to run one of the examples locally. These use an in memory ElasticMQ SQS Server to simplify getting started. For example, to run a sample Spring Application you can use the Spring Starter Example.

  1. Build the framework
gradle build -x test -x integrationTest
  1. Run the Spring Starer Example Spring Boot app
(cd examples/spring-starter-example && gradle bootRun)

Testing locally a dynamic concurrency example

This shows an example of running the SQS Listener in a Java application that will dynamically change the concurrency level while it is executing.

This examples works by having a thread constantly placing new messages while the SQS Listener will randomly change the rate of concurrency every 10 seconds.

  1. Build the framework
gradle build -x test -x integrationTest
  1. Run the Spring Starer Example Spring Boot app
(cd examples/core-example && gradle runApp)

Bugs and Feedback

For bugs, questions and discussions please use Github Issues.

Contributing

See CONTRIBUTING.md for more details.

License

MIT License

Copyright (c) 2018 Jaiden Ashmore

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

Versions

Version
3.1.1
3.1.0