franz


License

License

GroupId

GroupId

com.deciphernow
ArtifactId

ArtifactId

franz_2.11
Last Version

Last Version

1.0.0
Release Date

Release Date

Type

Type

jar
Description

Description

franz
franz
Project URL

Project URL

https://github.com/deciphernow/franz
Project Organization

Project Organization

com.deciphernow
Source Code Management

Source Code Management

https://github.com/deciphernow/franz

Download franz_2.11

How to add to project

<!-- https://jarcasting.com/artifacts/com.deciphernow/franz_2.11/ -->
<dependency>
    <groupId>com.deciphernow</groupId>
    <artifactId>franz_2.11</artifactId>
    <version>1.0.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.deciphernow/franz_2.11/
implementation 'com.deciphernow:franz_2.11:1.0.0'
// https://jarcasting.com/artifacts/com.deciphernow/franz_2.11/
implementation ("com.deciphernow:franz_2.11:1.0.0")
'com.deciphernow:franz_2.11:jar:1.0.0'
<dependency org="com.deciphernow" name="franz_2.11" rev="1.0.0">
  <artifact name="franz_2.11" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.deciphernow', module='franz_2.11', version='1.0.0')
)
libraryDependencies += "com.deciphernow" % "franz_2.11" % "1.0.0"
[com.deciphernow/franz_2.11 "1.0.0"]

Dependencies

compile (5)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.11.11
com.typesafe.akka : akka-stream-kafka_2.11 jar 0.14
com.deciphernow : moby-dns jar 1.0.0
com.whisk : docker-testkit-scalatest_2.11 jar 0.9.3
com.whisk : docker-testkit-impl-spotify_2.11 jar 0.9.3

test (1)

Group / Artifact Type Version
org.scalatest : scalatest_2.11 jar 3.0.1

Project Modules

There are no modules declared in this project.

Franz

A library for working with Kafka via Akka streams.

Usage

To use Franz within a Maven project add the following dependency:

<dependency>
    <groupId>com.deciphernow</groupId>
    <artifactId>franz</artifactId>
    <version>1.0.0</version>
</dependency>

To use Franz within an SBT project add the following dependency:

libraryDependencies += "com.deciphernow" % "franz" % "1.0.0"

Consuming from Kafka

Franz supports two methods of consuming records from Kafka: fetching and flowing. Fetching provides a means to retrieve a set number of records from Kafka while flowing allows you to continually stream records from Kafka. In most production use cases, you will be flowing from Kafka, but in a limited set of cases (e.g., testing) you simply need to fetch from Kafka. In either case Franz supports ephemeral consumers (i.e. consumers that do not track offsets) and persistent consumers (i.e., consumers that track offsets).

Fetching

The following line demonstrates how to fetch from Kafka:

Fetching.ephemera(settings, subscription, 10, Duration(10, SECONDS))

The above code will return ten records from Kafka or, if ten records are not available after waiting ten seconds, it will return all available records. To execute the same action but persist the offsets to Kafka so that subsequent calls do not retrieve the same records do the following:

Fetching.persistent(settings, subscription, 10, Duration(10, SECONDS))

Flowing

To flow records from Kafka do the following:

val flow = Flow[ConsumerRecord[K, V]].map { record =>
  /* Process the record here */
  Done
}

val future = Flowing.ephemeral(settings, subscription, flow)

If you desire to process records in parallel simply modify your flow to something like:

val flow = Flow[ConsumerRecord[K, V]].mapAsync(10) { record =>
  Future {
    /* Process the record here */
    Done
  }
}

This will allow for up to ten records to be processed in parallel.

If you need to commit offsets back to Kafka (in most use cases this is required) you should do the following:

val flow = Flow[ConsumerRecord[K, V]].map { record =>
  /* Process the record here */
  Done
}

val future = Flowing.persistent(settings, subscription, flow, 1, 10, 1)

The above will process one record at a time with the provided flow, but allows for commits to be batched in groups of up to ten records which are processed in a single thread. You can also process records in parallel but must ensure that the level of parallelism of the provided flow matches the flowParallelism parameter passed to Flowing.persistent. Failure to do so will result in either a blocking flow or buffer overflow.

val flow = Flow[ConsumerRecord[K, V]].mapAsync(25) { record =>
  /* Process the record here */
  Done
}

val future = Flowing.persistent(settings, subscription, flow, 25, 10, 1)

Note that commiting records to Kafka is expensive and allowing for larger batches or more parallelism will speed up your flow. However, larger batches may aslo result in more messages being reprocessed in failure cases.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request
com.deciphernow

Decipher Technology Studios

Software for your sixth sense

Versions

Version
1.0.0