Club Topicana

Ensure your Kafka Topics are running as expected at application startup

License

License

GroupId

GroupId

com.github.ftrossbach
ArtifactId

ArtifactId

club-topicana
Last Version

Last Version

0.1.0
Release Date

Release Date

Type

Type

pom
Description

Description

Club Topicana
Ensure your Kafka Topics are running as expected at application startup
Project URL

Project URL

https://github.com/ftrossbach/club-topicana
Source Code Management

Source Code Management

https://github.com/ftrossbach/club-topicana

Download club-topicana

Filename Size
club-topicana-0.1.0.pom 9 KB
Browse

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.ftrossbach/club-topicana/ -->
<dependency>
    <groupId>com.github.ftrossbach</groupId>
    <artifactId>club-topicana</artifactId>
    <version>0.1.0</version>
    <type>pom</type>
</dependency>
// https://jarcasting.com/artifacts/com.github.ftrossbach/club-topicana/
implementation 'com.github.ftrossbach:club-topicana:0.1.0'
// https://jarcasting.com/artifacts/com.github.ftrossbach/club-topicana/
implementation ("com.github.ftrossbach:club-topicana:0.1.0")
'com.github.ftrossbach:club-topicana:pom:0.1.0'
<dependency org="com.github.ftrossbach" name="club-topicana" rev="0.1.0">
  <artifact name="club-topicana" type="pom" />
</dependency>
@Grapes(
@Grab(group='com.github.ftrossbach', module='club-topicana', version='0.1.0')
)
libraryDependencies += "com.github.ftrossbach" % "club-topicana" % "0.1.0"
[com.github.ftrossbach/club-topicana "0.1.0"]

Dependencies

test (4)

Group / Artifact Type Version
org.junit.jupiter : junit-jupiter-api jar 5.0.1
junit : junit jar 4.12
org.junit.jupiter : junit-jupiter-engine jar 5.0.1
org.mockito : mockito-core jar 1.10.19

Project Modules

  • core
  • kafka-clients
  • kafka-streams
  • spring

Build Status Coverage Status License

Club Topicana

... check are free.

What does it do?

This library allows you to check the configuration of the Kafka topics you depend on before you produce or consume from them. This project was motivated by a situation in a project where Kafka was managed by a central team that did not want to disable automatic topic creation. So at some point, we ended up with a topic with just partition count and replication factor set to 1. Increasing a partition count is fairly easy, so folks did it right away. Replication factor? Not so much fun – you basically need to do a manual partition reassignment. Not fun for 50 partitions.

Club Topicana allows you to specify your expected topic configuration in advance (programmatically or in a YAML file) and then allows you to execute a check if it matches the real topic configuration when you create a Kafka Producer, Consumer, Stream or a Spring application

Configuration

A YAML config looks like this

- name: test_topic
  replication-factor: 1
  partition-count: 1
  config:
    - cleanup.policy: delete
    - delete.retention.ms: 86400000

- name: test_topic2
  replication-factor: 1
  config:
    - compression.type: producer
    - file.delete.delay.ms: 60000

This can be parsed in the following way:

Collection<ExpectedTopicConfiguration> expectedConfig = new ConfigParser().parseTopicConfiguration("classpath-location-of-file");

The programmatic equivalent looks like this:

ExpectedTopicConfiguration testTopic = new ExpectedTopicConfiguration.ExpectedTopicConfigurationBuilder("test_topic")
                                      .withReplicationFactor(1)
                                      .withPartitionCount(1)
                                      .withConfig("delete.retention.ms", "86400000")
                                      .withConfig("cleanup.policy", "delete")
                                      .build();
                                      
ExpectedTopicConfiguration testTopic2 = new ExpectedTopicConfiguration.ExpectedTopicConfigurationBuilder("test_topic2")
                                      .withReplicationFactor(1)
                                      .withConfig("compression.type", "producer")
                                      .withConfig("file.delete.delay.ms", "60000")
                                      .build();

Every parameter is optional – if no partition count is specified, Club Topicana assumes you don't care. Config properties are also only checked if they're specifically included

Kafka Clients and Streams

Club Topicana contains factories that extend the default constructors of KafkaProducer and KafkaConsumer with another parameter expecting a collection of ExpectedTopicConfiguration. If the config doesn't fit, those factory methods will throw a MismatchedTopicConfigException` ìf something goes wrong.

Examples:

Producer:

ExpectedTopicConfiguration expected = new ExpectedTopicConfiguration.ExpectedTopicConfigurationBuilder("test_topic")
                                      .withReplicationFactor(2).build();
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = KafkaProducerFactory.producer(props, Collections.singleton(expected));

Consumer:

ExpectedTopicConfiguration expected = new ExpectedTopicConfiguration.ExpectedTopicConfigurationBuilder("test_topic").build();

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, String> consumer = KafkaConsumerFactory.consumer(props, new StringDeserializer(), new StringDeserializer(), Collections.singleton(expected));

Streams:

ExpectedTopicConfiguration expected = new ExpectedTopicConfiguration.ExpectedTopicConfigurationBuilder("test_topic").build();

Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
[..]
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>table("test_topic").groupBy((key, value) -> new KeyValue<>(key, value)).count("store");

KafkaStreams streams = KafkaStreamsFactory.streams(builder.build(), new StreamsConfig(props), Collections.singleton(expected));

For Kafka producers and consumers, you may depend on

<dependency>
    <groupId>com.github.ftrossbach</groupId>
    <artifactId>club-topicana-kafka-clients</artifactId>
    <version>0.1.0</version>
</dependency>

For Kafka Streams, you may use

<dependency>
    <groupId>com.github.ftrossbach</groupId>
    <artifactId>club-topicana-kafka-streams</artifactId>
    <version>0.1.0</version>
</dependency>

Spring

For Spring applications, all you need to do is to use the "EnableClubTopicana" annotation:

@SpringBootApplication
@EnableClubTopicana
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
}

These are the configuration options:

Property Optional Default
club-topicana.bootstrap-servers No None (Example: "localhost:9092"
club-topicana.config-file:club-topicana.yml Yes club-topicana.yml
club-topicana.fail-on-mismatch:false Yes true

You can include it in your project by adding

<dependency>
    <groupId>com.github.ftrossbach</groupId>
    <artifactId>club-topicana-spring</artifactId>
    <version>0.1.0</version>
</dependency>

Versions

Version
0.1.0