Reactive Kafka

A high level kafka consumer which wrapps the low level api of Kafka Reactor and provides a similar usability like Spring Kafka

License

License

Categories

Categories

React User Interface Web Frameworks
GroupId

GroupId

com.quandoo.lib
ArtifactId

ArtifactId

reactive-kafka
Last Version

Last Version

1.5.1
Release Date

Release Date

Type

Type

jar
Description

Description

Reactive Kafka
A high level kafka consumer which wrapps the low level api of Kafka Reactor and provides a similar usability like Spring Kafka
Project URL

Project URL

https://github.com/quandoo/reactive-kafka
Source Code Management

Source Code Management

http://github.com/quandoo/reactive-kafka.git

Download reactive-kafka

How to add to project

<!-- https://jarcasting.com/artifacts/com.quandoo.lib/reactive-kafka/ -->
<dependency>
    <groupId>com.quandoo.lib</groupId>
    <artifactId>reactive-kafka</artifactId>
    <version>1.5.1</version>
</dependency>
// https://jarcasting.com/artifacts/com.quandoo.lib/reactive-kafka/
implementation 'com.quandoo.lib:reactive-kafka:1.5.1'
// https://jarcasting.com/artifacts/com.quandoo.lib/reactive-kafka/
implementation ("com.quandoo.lib:reactive-kafka:1.5.1")
'com.quandoo.lib:reactive-kafka:jar:1.5.1'
<dependency org="com.quandoo.lib" name="reactive-kafka" rev="1.5.1">
  <artifact name="reactive-kafka" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.quandoo.lib', module='reactive-kafka', version='1.5.1')
)
libraryDependencies += "com.quandoo.lib" % "reactive-kafka" % "1.5.1"
[com.quandoo.lib/reactive-kafka "1.5.1"]

Dependencies

compile (5)

Group / Artifact Type Version
io.reactivex.rxjava2 : rxjava jar 2.2.14
io.projectreactor.addons : reactor-adapter jar 3.3.0.RELEASE
io.projectreactor.kafka : reactor-kafka jar 1.2.1.RELEASE
org.apache.kafka : kafka-clients jar 2.3.1
com.github.daniel-shuy : kafka-jackson-serializer jar 0.1.2

runtime (10)

Group / Artifact Type Version
org.jetbrains.kotlin : kotlin-stdlib-jdk8 jar 1.3.50
org.reflections : reflections jar 0.9.9
com.fasterxml.jackson.core : jackson-core jar 2.10.0
com.fasterxml.jackson.core : jackson-databind jar 2.10.0
com.fasterxml.jackson.datatype : jackson-datatype-jsr310 jar 2.10.0
com.fasterxml.jackson.datatype : jackson-datatype-jdk8 jar 2.10.0
com.fasterxml.jackson.module : jackson-module-kotlin jar 2.10.0
org.apache.commons : commons-lang3 jar 3.9
com.google.guava : guava jar 28.1-jre
org.slf4j : slf4j-api jar 1.7.28

Project Modules

There are no modules declared in this project.

Reactive Kafka

A high level kafka consumer which wrapps the low level api of Kafka Reactor and provides a similar usability like Spring Kafka.

Dependency

implementation("com.quandoo.lib:reactive-kafka:1.4.0")

Usage

Spring

The configuration is auto-discoverable hence only the artifact has to be included in you project and a yaml configuration has to be added.

Properties

kafka:
  bootstrap-servers: "localhost:9092"                                                     # Kafka servers
  security-protocol: "SSL"                                                                # Security protocol used (Default: PLAINTEXT)
  client-dns-lookup: "use_all_dns_ips"                                                    # Dns lookup (Default: use_all_dns_ips)
  consumer:
    group-id: ${spring.application.name}                                                  # Kafka groupId
    parallelism: 1                                                                        # How many parallel consumptions (Default: 1)
    auto-offset-reset: earliest                                                           # Offset reset (Default: latest)
    batch-size: 10                                                                        # Max number of messages per one batch (Default: 10)
    partition-assignment-strategy: "org.apache.kafka.clients.consumer.RangeAssignor"      # How to assign partitions (Default: org.apache.kafka.clients.consumer.RangeAssignor)
    commit-interval: 200                                                                  # Max time to wait until the committed messages are synced with kafka (Default: 200)
    commit-batch-size: 10                                                                 # Max number of uncommitted messages until the committed messages are synced with kafka (Default: batch-size) 
    heart-beat-interval-millis: 3000                                                      # Heart-beat period (Default: 3000)
    session-timeout-millis: 10000                                                         # Session timeout (Default: 10000)
    retry-backoff-millis: 100                                                             # How long to backoff until retrying again (Default: 100)
    max-pool-interval-millis: 300000                                                      # Max interval between 2 pools (Default: 300000)
  producer:
    max-in-flight: 10                                                                     # Max number of message un-ackd
  
  # Documented in official kafka client
  ssl:
    endpoint-identification-algorithm: ""
    protocol: ""
    enabled-protocols: ""
    provider: ""
    cypher-suites: ""
    keystore-type: ""
    keystore-location: ""
    keystore-password: ""
    key-password: ""
    truststore-type: ""
    truststore-location: ""
    truststore-password: ""
    keymanager-algorithm: ""
    trustmanager-algorithm: ""
    secure-random-implementation: ""
  # Documented in official kafka client
  sasl:
    mechanism: ""
    jaas: ""
    client-callback-handler-class: ""
    login-callback-handler-class: ""
    login-class: ""
    kerbos-service-name: ""
    kerbos-kinit-cmd: ""
    kerbos-ticket-renew-window-factor: 0.5
    kerbos-ticket-renew-jitter: 0.5
    kerbos-min-time-before-relogin: 100
    login-refresh-window-factor: 100
    login-refresh-window-jitter: 100
    login-refresh-min-period-seconds: 10
    login-refresh-buffer-seconds: 10

All consumer properties can be also specified/overloaded in the listener annotation.

Consumer configuration

The function which is handling the message has to return RxJava2 Completable or Reactor Mono. The name parameter is putting the listeners and filters in a group. Filters will apply to listeners which have the same name.

Single Listener
      // Topics support SPEL
      @KafkaListener(groupId = "test-consumer", topics = {"topic1", "topic2"}, valueType = DTO.class)
      public Completable processMessage(final ConsumerRecord<String, DTO> message) {
          // Do something
      }
Batch Listener
      // Topics support SPEL
      @KafkaListener(groupId = "test-consumer", topics = {"topic1", "topic2"}, valueType = DTO.class)
      public Mono<Void> processMessage(final List<ConsumerRecord<String, DTO>> messages) {
          // Do something
      }
Filter

Allows to filter the message after key and value deserializer

      @Component
      @KafkaListenerFilter(groupId = "test-consumer", valueClass = DTO.class)
      public class VersionFilter implements Predicate<ConsumerRecord<Object, Object>> {
      
          @Override
          Boolean apply(ConsumerRecord<Object, Object> receiverRecord) {
              return true
          }
      }
Pre-Filter

Allows to filter the message before the key and value deserializers kick in

      @Component
      @KafkaListenerPreFilter(groupId = "test-consumer")
      public class VersionFilter implements Predicate<ConsumerRecord<Bytes, Bytes>> {
      
          Boolean apply(ConsumerRecord<Bytes, Bytes> consumerRecord) {
              return true
          }
      }
Producer
      @Autowired
      private KafkaSender<String, DTO> kafkaSender;
Limitations

The current implementation supports only keys as strings and message bodies as JSON. It will use the ObjectMapper defined in the spring context

Manual

      public void createConsumer() {
              final KafkaProperties.KafkaConsumerProperties kafkaConsumerProperties = new KafkaProperties.KafkaConsumerProperties();
              kafkaConsumerProperties.setGroupId("test-consumer");
              kafkaConsumerProperties.setAutoOffsetReset("earliest");
      
              final KafkaProperties.KafkaProducerProperties kafkaProducerProperties = new KafkaProperties.KafkaProducerProperties();
              kafkaProducerProperties.setMaxInFlight(10);
      
              final KafkaProperties kafkaProperties = new KafkaProperties();
              kafkaProperties.setBootstrapServers("localhost:9092");
              kafkaProperties.setConsumer(kafkaConsumerProperties);
              kafkaProperties.setProducer(kafkaProducerProperties);
      
              final KafkaListenerMeta<? extends String, ? extends String> kafkaListenerMeta = new KafkaListenerMeta(
                      message -> {
                          // Handle
                          return Completable.complete();
                      },
                      ImmutableList.of("topic1"),
                      String.class,
                      String.class,
                      new StringDeserializer(),
                      new StringDeserializer(),
                      Predicates.alwaysTrue(),
                      Predicates.alwaysTrue()
              );
      
              final KafkaConsumer kafkaConsumer = new KafkaConsumer(kafkaProperties, ImmutableList.of(kafkaListenerMeta));
              kafkaConsumer.start();
          }

License

Apache License, Version 2.0

com.quandoo.lib

Quandoo

Public repository of Quandoo open-source projects

Versions

Version
1.5.1
1.5.0
1.4.0
1.3.1
1.3.0
1.2.4
1.2.3
1.2.2
1.2.1
1.2.0
1.1.0
1.0.3
1.0.2
1.0.1
1.0.0