kafka-jackson-serializer

Serializer/Deserializer for Kafka that uses Jackson to marshall/unmarshall Objects to/from JSON

License

License

Categories

Categories

Jackson Data JSON
GroupId

GroupId

com.github.daniel-shuy
ArtifactId

ArtifactId

kafka-jackson-serializer
Last Version

Last Version

0.1.2
Release Date

Release Date

Type

Type

jar
Description

Description

kafka-jackson-serializer
Serializer/Deserializer for Kafka that uses Jackson to marshall/unmarshall Objects to/from JSON
Project URL

Project URL

https://github.com/daniel-shuy/kafka-jackson-serializer
Source Code Management

Source Code Management

https://github.com/daniel-shuy/kafka-jackson-serializer.git

Download kafka-jackson-serializer

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.daniel-shuy/kafka-jackson-serializer/ -->
<dependency>
    <groupId>com.github.daniel-shuy</groupId>
    <artifactId>kafka-jackson-serializer</artifactId>
    <version>0.1.2</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.daniel-shuy/kafka-jackson-serializer/
implementation 'com.github.daniel-shuy:kafka-jackson-serializer:0.1.2'
// https://jarcasting.com/artifacts/com.github.daniel-shuy/kafka-jackson-serializer/
implementation ("com.github.daniel-shuy:kafka-jackson-serializer:0.1.2")
'com.github.daniel-shuy:kafka-jackson-serializer:jar:0.1.2'
<dependency org="com.github.daniel-shuy" name="kafka-jackson-serializer" rev="0.1.2">
  <artifact name="kafka-jackson-serializer" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.daniel-shuy', module='kafka-jackson-serializer', version='0.1.2')
)
libraryDependencies += "com.github.daniel-shuy" % "kafka-jackson-serializer" % "0.1.2"
[com.github.daniel-shuy/kafka-jackson-serializer "0.1.2"]

Dependencies

compile (2)

Group / Artifact Type Version
org.apache.kafka : kafka-clients jar 1.0.0
com.fasterxml.jackson.core : jackson-databind jar 2.9.0

test (2)

Group / Artifact Type Version
org.junit.jupiter : junit-jupiter-api jar 5.1.0
com.github.charithe : kafka-junit jar 4.1.0

Project Modules

There are no modules declared in this project.

kafka-jackson-serializer

This project is no longer being maintained (however, the released artifacts will still remain in Maven Central). Use spring-kafka's JsonSerializer/JsonDeserializer/JsonSerde instead.

Serializer/Deserializer for Kafka that uses Jackson to marshall/unmarshall Objects to/from JSON

Requirements

Dependency Version
Kafka 1.X.X
Jackson 2.9.X
Java 7+

Usage

Add the following to your Maven dependency list:

<dependency>
    <groupId>com.github.daniel-shuy</groupId>
    <artifactId>kafka-jackson-serializer</artifactId>
    <version>0.1.2</version>
</dependency>

Override the kafka-client dependency version with the version of Kafka you wish to use:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

Override the jackson-databind dependency version with the version of Jackson you wish to use:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

Kafka Producer

ObjectMapper mapper = new ObjectMapper();
// mapper.configure(..., ...);

Properties props = new Properties();
// props.put(..., ...);

Producer<String, MyValue> producer = new KafkaProducer<>(props,
    new StringSerializer(),
    new KafkaJacksonSerializer(mapper));

producer.send(new ProducerRecord<>("topic", new MyValue()));

Kafka Consumer

ObjectMapper mapper = new ObjectMapper();
// mapper.configure(..., ...);

Properties props = new Properties();
// props.put(..., ...);

Consumer<String, MyValue> consumer = new KafkaConsumer<>(props,
    new StringDeserializer(),
    new KafkaJacksonDeserializer(mapper, MyValue.class));

consumer.subscribe("topic");
ConsumerRecords<String, MyValue> records = consumer.poll(100);

records.forEach(record -> {
    String key = record.key();
    MyValue value = record.value();

    // ...
});

Kafka Streams

ObjectMapper mapper = new ObjectMapper();
// mapper.configure(..., ...);

Serde<String> stringSerde = Serdes.String();
Serde<MyValue> myValueSerde = Serdes.serdeFrom(
        new KafkaJacksonSerializer(mapper), 
        new KafkaJacksonDeserializer(mapper, MyValue.class));

Properties config = new Properties();
// config.put(..., ...);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MyValue> myValues = builder.stream("input_topic", Consumed.with(stringSerde, myValueSerde));
KStream<String, MyValue> filteredMyValues = myValues.filter((key, value) -> {
    // ...
});
filteredMyValues.to("output_topic", Produced.with(stringSerde, myValueSerde));

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, config);
streams.setUncaughtExceptionHandler((thread, throwable) -> {
    // ...
});
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();

Versions

Version
0.1.2
0.1.1
0.1.0