kafka-reader


License

License

GroupId

GroupId

com.bwsw
ArtifactId

ArtifactId

kafka-reader_2.12
Last Version

Last Version

0.10.2
Release Date

Release Date

Type

Type

jar
Description

Description

kafka-reader
kafka-reader
Project URL

Project URL

https://github.com/bwsw/kafka-reader
Project Organization

Project Organization

com.bwsw
Source Code Management

Source Code Management

https://github.com/bwsw/kafka-reader

Download kafka-reader_2.12

How to add to project

<!-- https://jarcasting.com/artifacts/com.bwsw/kafka-reader_2.12/ -->
<dependency>
    <groupId>com.bwsw</groupId>
    <artifactId>kafka-reader_2.12</artifactId>
    <version>0.10.2</version>
</dependency>
// https://jarcasting.com/artifacts/com.bwsw/kafka-reader_2.12/
implementation 'com.bwsw:kafka-reader_2.12:0.10.2'
// https://jarcasting.com/artifacts/com.bwsw/kafka-reader_2.12/
implementation ("com.bwsw:kafka-reader_2.12:0.10.2")
'com.bwsw:kafka-reader_2.12:jar:0.10.2'
<dependency org="com.bwsw" name="kafka-reader_2.12" rev="0.10.2">
  <artifact name="kafka-reader_2.12" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.bwsw', module='kafka-reader_2.12', version='0.10.2')
)
libraryDependencies += "com.bwsw" % "kafka-reader_2.12" % "0.10.2"
[com.bwsw/kafka-reader_2.12 "0.10.2"]

Dependencies

compile (4)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.12.4
com.typesafe : config jar 1.3.0
org.slf4j : slf4j-api jar 1.7.25
org.apache.kafka : kafka_2.12 jar 0.10.2.1

test (1)

Group / Artifact Type Version
org.scalatest : scalatest_2.12 jar 3.0.1

Project Modules

There are no modules declared in this project.

cs-kafka-reader

Kafka CloudStack Events Reader and Evaluator Framework

The aim of the library is the convenient handling of Kafka messages. It provides the mechanisms to:

  1. Buffer the messages to vary a count of processing messages without changing a consumer properties
  2. Implement the logic of storing consumer offsets to any place

Install with SBT

Add the following to your build.sbt

libraryDependencies += "com.bwsw" %% "kafka-reader" % "0.10.1"

Getting Started

The diagram below is a simple illustration of how the library's classes should be used.
Implement your own EventHandler and EventManager in the way as it is displayed on the diagram.
Sequence where:
* K - type of ConsumerRecord key
* V - type of ConsumerRecord value
* T - type of data after handle a ConsumerRecord by the instance of EventHandler implementation

Example Usage

The example below shows how to print messages from Kafka to the console. The call to the method is performed in Future:

class SimpleEventHandler(messageQueue: MessageQueue[String,String], messageCount: Int)
  extends EventHandler[String,String,Future[Unit]](messageQueue, messageCount) {

  override def handle(flag: AtomicBoolean): List[OutputEnvelope[Future[Unit]]] = {
    val inputEnvelopes = messageQueue.take(messageCount)
    inputEnvelopes.map { x =>
      OutputEnvelope[Future[Unit]](x.topic, x.partition, x.offset, Future(println(x.data)))
    }
  }

}

According to the diagram above the main class looks like this:

object EventManager {
    
    def main(args: Array[String]): Unit = {
          val dummyFlag = new AtomicBoolean(true)
          val consumer = new Consumer[String,String](Consumer.Settings("localhost:9092", "group01", 3000))
      
          val checkpointInfoProcessor = new CheckpointInfoProcessor[String,String,Future[Unit]](
            TopicInfoList(List(TopicInfo(topic = "topic1"))),
            consumer
          )
      
          val messageQueue = new MessageQueue[String,String](consumer)
      
          val eventHandler = new SimpleEventHandler(messageQueue, countOfMessages = 1)
      
          checkpointInfoProcessor.load()
      
          val outputEnvelopes = eventHandler.handle(dummyFlag)
          
          outputEnvelopes.data.foreach {
            case Success(x) => 
            case Failure(e) =>
              prinln(s"something went wrong, exception was thrown: $e")
              throw e
          }
          
          checkpointInfoProcessor.save(outputEnvelopes)
          consumer.close()
      }
}

Testing

Unit tests

Run tests: sbt test

Integration tests

Run tests: sbt it:test

Versioning

Library has the same version as Apache Kafka library

License

This project is licensed under the Apache License - see the LICENSE file for details

Versions

Version
0.10.2
0.10.1