kafka-protobuf-serde

Serializer/Deserializer for Kafka to serialize/deserialize Protocol Buffers messages

License

License

Categories

Categories

Protobuf Data Data Structures
GroupId

GroupId

com.github.daniel-shuy
ArtifactId

ArtifactId

kafka-protobuf-serde
Last Version

Last Version

2.2.0
Release Date

Release Date

Type

Type

jar
Description

Description

kafka-protobuf-serde
Serializer/Deserializer for Kafka to serialize/deserialize Protocol Buffers messages
Project URL

Project URL

https://github.com/daniel-shuy/kafka-protobuf-serde
Source Code Management

Source Code Management

https://github.com/daniel-shuy/kafka-protobuf-serde.git

Download kafka-protobuf-serde

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.daniel-shuy/kafka-protobuf-serde/ -->
<dependency>
    <groupId>com.github.daniel-shuy</groupId>
    <artifactId>kafka-protobuf-serde</artifactId>
    <version>2.2.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.daniel-shuy/kafka-protobuf-serde/
implementation 'com.github.daniel-shuy:kafka-protobuf-serde:2.2.0'
// https://jarcasting.com/artifacts/com.github.daniel-shuy/kafka-protobuf-serde/
implementation ("com.github.daniel-shuy:kafka-protobuf-serde:2.2.0")
'com.github.daniel-shuy:kafka-protobuf-serde:jar:2.2.0'
<dependency org="com.github.daniel-shuy" name="kafka-protobuf-serde" rev="2.2.0">
  <artifact name="kafka-protobuf-serde" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.daniel-shuy', module='kafka-protobuf-serde', version='2.2.0')
)
libraryDependencies += "com.github.daniel-shuy" % "kafka-protobuf-serde" % "2.2.0"
[com.github.daniel-shuy/kafka-protobuf-serde "2.2.0"]

Dependencies

compile (3)

Group / Artifact Type Version
org.apache.kafka : kafka-clients jar 2.2.1
com.google.protobuf : protobuf-java jar 3.8.0
org.apache.kafka : kafka-clients jar 2.2.1

test (5)

Group / Artifact Type Version
org.springframework.boot : spring-boot-starter-test jar 2.1.5.RELEASE
org.springframework.kafka : spring-kafka jar 2.2.6.RELEASE
org.springframework.kafka : spring-kafka-test jar 2.2.6.RELEASE
org.apache.kafka : kafka_2.11 jar 2.2.1
org.apache.kafka : kafka_2.11 jar 2.2.1

Project Modules

There are no modules declared in this project.

kafka-protobuf-serde

Branch Travis CI CodeFactor Codacy Better Code Hub Coverall
Master Build Status CodeFactor Codacy Badge BCH compliance Coverage Status
Develop Build Status CodeFactor Codacy Badge BCH compliance Coverage Status

Serializer/Deserializer for Kafka to serialize/deserialize Protocol Buffers messages

Requirements

Dependency Version
Kafka 2.X.X
Protobuf 3.X.X
Java 8+

Usage

Add the following to your Maven dependency list:

<dependency>
    <groupId>com.github.daniel-shuy</groupId>
    <artifactId>kafka-protobuf-serde</artifactId>
    <version>2.2.0</version>
</dependency>

Override the protobuf.version property with the version of Protobuf you wish to use (WARNING: do not directly override the protobuf-java dependency version):

<properties>
    <protobuf.version>3.13.0</protobuf.version>
</properties>

Optionally, you may also override the kafka-clients dependency version with the version of Kafka you wish to use:

<properties>
    <kafka.version>2.3.1</kafka.version>
</properties>

Kafka Producer

Properties props = new Properties();
// props.put(..., ...);

Producer<String, MyValue> producer = new KafkaProducer<>(props,
    new StringSerializer(),
    new KafkaProtobufSerializer<>());

producer.send(new ProducerRecord<>("topic", new MyValue()));

Kafka Consumer

Properties props = new Properties();
// props.put(..., ...);

Consumer<String, MyValue> consumer = new KafkaConsumer<>(props,
    new StringDeserializer(),
    new KafkaProtobufDeserializer<>(MyValue.parser()));

consumer.subscribe(Collections.singleton("topic"));
ConsumerRecords<String, MyValue> records = consumer.poll(Duration.ofMillis(100));

records.forEach(record -> {
    String key = record.key();
    MyValue value = record.value();

    // ...
});

Kafka Streams

Serde<String> stringSerde = Serdes.String();
Serde<MyValue> myValueSerde = new KafkaProtobufSerde<>(MyValue.parser());

Properties config = new Properties();
// config.put(..., ...);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MyValue> myValues = builder.stream("input_topic", Consumed.with(stringSerde, myValueSerde));
KStream<String, MyValue> filteredMyValues = myValues.filter((key, value) -> {
    // ...
});
filteredMyValues.to("output_topic", Produced.with(stringSerde, myValueSerde));

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, config);
streams.setUncaughtExceptionHandler((thread, throwable) -> {
    // ...
});
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();

Spring for Apache Kafka (spring-kafka)

Kafka Producer

@Configuration
public class KafkaConfig {
    @Bean
    public ProducerFactory<String, MyValue> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        // props.put(..., ...);

        return new DefaultKafkaProducerFactory<>(producerProps,
                new StringSerializer(),
                new KafkaProtobufSerializer<>());
    }

    @Bean
    public KafkaTemplate<String, MyValue> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}

Kafka Consumer

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyValue>>
            kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, MyValue> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        // props.put(..., ...);

        return new DefaultKafkaConsumerFactory<>(props,
            new StringDeserializer(),
            new KafkaProtobufDeserializer<>(MyValue.parser()));
    }
}

public class Listener {
    @KafkaListener(id = "foo", topics = "annotated1")
    public void listen1(String foo) {
        // ...
    }
}

Versions

Version
2.2.0
2.1.1
2.1.0
2.0.0
1.0.0