Kafka CDI

CDI extension for Apache Kafka

License

License

GroupId

GroupId

com.konnectkode
ArtifactId

ArtifactId

kafka-cdi
Last Version

Last Version

0.1.0
Release Date

Release Date

Type

Type

jar
Description

Description

Kafka CDI
CDI extension for Apache Kafka
Project URL

Project URL

https://github.com/konnectkode/kafka-cdi
Source Code Management

Source Code Management

https://github.com/konnectkode/kafka-cdi.git

Download kafka-cdi

How to add to project

<!-- https://jarcasting.com/artifacts/com.konnectkode/kafka-cdi/ -->
<dependency>
    <groupId>com.konnectkode</groupId>
    <artifactId>kafka-cdi</artifactId>
    <version>0.1.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.konnectkode/kafka-cdi/
implementation 'com.konnectkode:kafka-cdi:0.1.0'
// https://jarcasting.com/artifacts/com.konnectkode/kafka-cdi/
implementation ("com.konnectkode:kafka-cdi:0.1.0")
'com.konnectkode:kafka-cdi:jar:0.1.0'
<dependency org="com.konnectkode" name="kafka-cdi" rev="0.1.0">
  <artifact name="kafka-cdi" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.konnectkode', module='kafka-cdi', version='0.1.0')
)
libraryDependencies += "com.konnectkode" % "kafka-cdi" % "0.1.0"
[com.konnectkode/kafka-cdi "0.1.0"]

Dependencies

compile (2)

Group / Artifact Type Version
jakarta.platform : jakarta.jakartaee-web-api jar 8.0.0
org.apache.kafka : kafka-clients jar 2.3.0

provided (1)

Group / Artifact Type Version
org.slf4j : slf4j-simple jar 1.7.26

test (10)

Group / Artifact Type Version
junit : junit jar 4.12
org.assertj : assertj-core jar 3.11.1
org.mockito : mockito-core jar 3.1.0
org.mockito : mockito-inline jar 3.1.0
org.jboss.weld : weld-core jar 2.3.5.Final
org.jboss.arquillian.junit : arquillian-junit-container jar
org.jboss.arquillian.container : arquillian-weld-ee-embedded-1.1 jar 1.0.0.Final
io.debezium : debezium-core jar 0.10.0.Final
io.debezium : debezium-core test-jar 0.10.0.Final
org.apache.kafka : kafka_2.11 jar 2.3.0

Project Modules

There are no modules declared in this project.

Kafka-CDI - A extension for Apache Kafka

CircleCI Codecov Maven Central GitHub

Getting started

The config is done by implementing the interface KafkaConfigProperties with qualifier @KafkaConfig. The interface is composed by three methods commonClientConfigs(), producerConfigs() and consumerConfigs(), each methods should return only the specific Kafka configuration org.apache.kafka.clients.CommonClientConfigs, org.apache.kafka.clients.producer.ProducerConfig and org.apache.kafka.clients.consumer.ConsumerConfig respectively.

@KafkaConfig
public class KafkaConfigPropertiesImpl implements KafkaConfigProperties {
    
    @Override
    public Properties commonClientConfigs() {
        Properties properties = new Properties();
        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, "client.id");

        return properties;
    }
    
    
    @Override
    public Properties producerConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional.id");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, Long.MAX_VALUE);
        
        return properties;
    }

    @Override
    public Properties consumerProperties() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.id");
        properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    
        return properties;
    }

}

Creating a Kafka Consumer

The API provides the @Consumer annotation to register Kafka Consumers which internally will be used to create a vanilla KafkaConsumer.

public class MyConsumer {

    private final Logger LOG = LoggerFactory.getLogger(MyConsumer.class);

    @Consumer(topics = {"topic.one", "topic.two"}, groupId = "my-consumer-group")
    public void consume(@Value String payload) {
        LOG.info("Received message: {}", payload);
    }
}

Receive additional informations is also possible using @Key and @Header annotations to inject the record key and record header.

import org.apache.kafka.common.header.Headers;

public class MyConsumer {

    private final Logger LOG = LoggerFactory.getLogger(MyConsumer.class);

    @Consumer(topics = {"topic.one", "topic.two"}, groupId = "my-consumer-group")
    public void consume(@Value String payload, @Key String key, @Headers Headers headers) {
        LOG.info("Received message: {}, key: {}, header: {}", payload, key, headers);
    }
}

The groupId is optional if the KafkaConfigProperties.consumerConfig() method returns the ConsumerConfig.GROUP_ID_CONFIG property. If both are present, @Consumer(groupId = "any") will be used.

Deserializer

The Consumer default deserializer is org.apache.kafka.common.serialization.StringDeserializer for the key and value, both can be changed in the @Consumer annotation by any class that implements the org.apache.kafka.common.serialization.Deserializer interface.

public class MyConsumer {

    private final Logger LOG = LoggerFactory.getLogger(MyConsumer.class);

    @Consumer(topics = {"topic.one", "topic.two"}, groupId = "my-consumer-group", keyDeserializer = IntegerDeserializer.class)
    public void consume(@Value String payload, @Key Integer key) {
        LOG.info("Received message: {}", payload);
    }
}

Injecting a KafkaProducer

For better flexibility and compatibility, the API provides a properly configured vanilla KafkaProducer, without wrappers or decorators. However, if the ProducerConfig.TRANSACTIONAL_ID_CONFIG property is returned by the KafkaConfigProperties.producerConfig() method, the API will be responsible for calling the KafkaProducer.initTransactions() method before any other method.

public class MyProducer {

    @Inject
    @Producer
    private KafkaProducer<Integer, String> producer;

    public void myProducerMethod(String message) {
        producer.send(new ProducerRecord<>("kafka.topic", message));
    }
}

KafkaProducer is thread safe, so the producer is declared as @ApplicationScoped.

Versions

Version
0.1.0