Akka Eventhubs
Akka Streams Azure Eventhubs Source and Sink
USAGE
update your build.sbt dependencies with:
// https://mvnrepository.com/artifact/tech.navicore/akkaeventhubs
libraryDependencies += "tech.navicore" %% "akkaeventhubs" % "1.4.1"
SOURCE
add to application.conf
eventhubs {
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 4
core-pool-size-factor = 2.0
core-pool-size-max = 8
}
throughput = 10
mailbox-capacity = -1
mailbox-type = ""
}
}
eventhubs-in {
snapshotInterval = 100
persist = false
persistFreq = 1
offsetPersistenceId = "my_example_eventhubsOffset"
connection {
connStr = ${EVENTHUBS_1_CONNSTR}
partitions = ${EVENTHUBS_1_PARTITION_COUNT}
defaultOffset = "LATEST"
consumerGroup = "$Default"
receiverTimeout = 120s
receiverBatchSize = 1
readersPerPartition = 1
}
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 4
core-pool-size-factor = 2.0
core-pool-size-max = 8
}
throughput = 10
mailbox-capacity = -1
mailbox-type = ""
}
}
ack the the item once processed for a partition source:
val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")
val source1 = createPartitionSource(0, cfg)
source1.runForeach(m => {
println(s"SINGLE SOURCE: ${m._1.substring(0, 160)}")
m._2.ack()
})
ack the the item once processed after merging all the partition sources:
val consumer: Sink[(String, AckableOffset), Future[Done]] =
Sink.foreach(m => {
println(s"SUPER SOURCE: ${m._1.substring(0, 160)}")
m._2.ack()
})
val toConsumer = createToConsumer(consumer)
val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")
for (pid <- 0 until EventHubConf(cfg).partitions) {
val src: Source[(String, AckableOffset), NotUsed] =
createPartitionSource(pid, cfg)
src.runWith(toConsumer)
}
With Persistence of Offsets
change applicagtion.conf and configure Actor Persistence
eventhubs-in {
persist = true
...
...
...
SINK
The sink requires a stream shape using a case class
case class EventhubsSinkData(payload: Array[Byte],
keyOpt: Option[String] = None,
props: Option[Map[String, String]] = None,
ackable: Option[AckableOffset] = None,
genericAck: Option[() => Unit] = None)
payloadis what you think it is.keyOptis the partition key. If not set, the Sink will use a hash of the payload.propsis an optional string map that will add properties to the Eventhubs metadata for this item.ackableis optional and will be committed when the payload is successfully sent.genericAckis an optional function and will be called when the payload is successfully sent.
val outConfig: Config = ConfigFactory.load().getConfig("eventhubs-out")
...
...
...
val format = Flow[(String, AckableOffset)].map((x: (String, AckableOffset)) =>
EventhubsSinkData(x._1.getBytes("UTF8"), None, None, Some(x._2))
)
src.via(<SOME_PROCESSING_FLOW>).via(format).runWith(new EventhubsSink(EventHubConf(outConfig)))
OPS
publish local
sbt +publishLocalSigned
publish to nexus staging
export GPG_TTY=$(tty)
sbt +publishSigned
sbt sonatypeReleaseAll