Kafka service

Common tools for kafka

License

License

https://github.com/icerockdev/kafka-service/blob/master/LICENSE.md
GroupId

GroupId

com.icerockdev.service
ArtifactId

ArtifactId

kafka-service
Last Version

Last Version

0.1.1
Release Date

Release Date

Type

Type

pom.sha512
Description

Description

Kafka service
Common tools for kafka
Project URL

Project URL

https://github.com/icerockdev/kafka-service
Source Code Management

Source Code Management

https://github.com/icerockdev/kafka-service

Download kafka-service

Dependencies

compile (5)

Group / Artifact Type Version
org.apache.kafka : kafka-clients jar 2.3.0
com.fasterxml.jackson.core : jackson-databind jar 2.9.8
com.fasterxml.jackson.core : jackson-core jar 2.9.8
com.fasterxml.jackson.module : jackson-module-kotlin jar 2.9.8
org.jetbrains.kotlin : kotlin-stdlib-jdk8 jar 1.4.31

runtime (4)

Group / Artifact Type Version
org.jetbrains.kotlinx : kotlinx-coroutines-core-jvm jar 1.4.2
ch.qos.logback : logback-classic jar 1.2.1
com.fasterxml.jackson.datatype : jackson-datatype-joda jar 2.9.8
joda-time : joda-time jar 2.9.9

Project Modules

There are no modules declared in this project.

Kafka

Installation

// Append repository
repositories {
    mavenCentral()
}

// Append dependency
implementation("com.icerockdev.service:kafka-service:0.1.1")

Koin configure

include dependency

// Producer
single(TestProducerService(
    appConf.getString("kafka.servers"),
    appConf.getString("kafka.clientId"),
    "topicName"
))

// Consumer
single(TestKafkaConsumer(
    appConf.getString("kafka.servers"),
    appConf.getString("kafka.groupId"),
    appConf.getString("kafka.clientId")
))

// Executor
single {
    KafkaConsumerExecutionPool(Dispatchers.IO)
}
// application.conf
kafka {
    servers = ${KAFKA_BROKERS}
    clientId = "auth_service"
}

Producer definition

class TestProducerService(servers: String, clientId: String, private val topic: String) : AutoCloseable {
    private val producer = KafkaProducerBuilder()
//        .applyTransactional(KAFKA_TRANSACTION_ID) // supported only for 3 brokers and more
        .applyIdempotence()
        .applyTimeout()
        .applyBuffering()
        .build<Long, String>(
            servers = servers,
            clientId = clientId,
            keySerializer = LongSerializer(),
            valueSerializer = ObjectSerializer()
        )

    fun sendData(model: String): Boolean {
        val time = System.currentTimeMillis()
        return KafkaSender.send(producer, topic, time, model)
    }

    fun sendAsyncData(model: String) {
        val time = System.currentTimeMillis()
        KafkaSender.sendAsync(producer, topic, time, model)
    }

    override fun close() {
        producer.flush()
        producer.close()
    }
}

Consumer definition

class TestKafkaConsumer(servers: String, groupId: String, clientId: String) : IKafkaConsumer {

    private val consumer =
        KafkaConsumerBuilder()
            .applyReadOpt()
            .applyIsolation(KafkaConsumerBuilder.IsolationLevel.READ_COMMITTED)
            .apply {
                with(props) {
                    this[ConsumerConfig.FETCH_MAX_BYTES_CONFIG] = 50 * 1024 * 1024
                    this[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaConsumerBuilder.Offset.LATEST.value
                }
            }
            .build<String, String>(
                servers,
                groupId,
                clientId,
                StringDeserializer(),
                ObjectDeserializer(jacksonTypeRef())
            )


    override fun run(executor: KafkaConsumerExecutionPool, topic: String) {
        executor.runExecutor(
            consumer = consumer,
            topicList = Collections.singletonList(topic),
            pollWait = Duration.ofMillis(100)
        ) {
            this.forEach { record ->
                // apply values
                println("Read value: ${record.value()}")
            }
            true
        }

    }

    override fun close() {
        consumer.close()
    }
}

Required correct shutdown executors (AutoCloseable)

Warning! Do not change stored data format (or save backward compatibility)

Contributing

All development (both new features and bug fixes) is performed in the develop branch. This way master always contains the sources of the most recently released version. Please send PRs with bug fixes to the develop branch. Documentation fixes in the markdown files are an exception to this rule. They are updated directly in master.

The develop branch is pushed to master on release.

For more details on contributing please see the contributing guide.

License

Copyright 2019 IceRock MAG Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
com.icerockdev.service

IceRock Development

Kotlin Multiplatform developers team

Versions

Version
0.1.1