AkkaEventhubs


License

License

MIT
Categories

Categories

Akka Container Microservices Reactive libraries
GroupId

GroupId

tech.navicore
ArtifactId

ArtifactId

akkaeventhubs_2.11
Last Version

Last Version

1.4.0
Release Date

Release Date

Type

Type

jar
Description

Description

AkkaEventhubs
AkkaEventhubs
Project URL

Project URL

https://github.com/navicore/akka-eventhubs
Project Organization

Project Organization

tech.navicore
Source Code Management

Source Code Management

https://github.com/navicore/akka-eventhubs

Download akkaeventhubs_2.11

How to add to project

<!-- https://jarcasting.com/artifacts/tech.navicore/akkaeventhubs_2.11/ -->
<dependency>
    <groupId>tech.navicore</groupId>
    <artifactId>akkaeventhubs_2.11</artifactId>
    <version>1.4.0</version>
</dependency>
// https://jarcasting.com/artifacts/tech.navicore/akkaeventhubs_2.11/
implementation 'tech.navicore:akkaeventhubs_2.11:1.4.0'
// https://jarcasting.com/artifacts/tech.navicore/akkaeventhubs_2.11/
implementation ("tech.navicore:akkaeventhubs_2.11:1.4.0")
'tech.navicore:akkaeventhubs_2.11:jar:1.4.0'
<dependency org="tech.navicore" name="akkaeventhubs_2.11" rev="1.4.0">
  <artifact name="akkaeventhubs_2.11" type="jar" />
</dependency>
@Grapes(
@Grab(group='tech.navicore', module='akkaeventhubs_2.11', version='1.4.0')
)
libraryDependencies += "tech.navicore" % "akkaeventhubs_2.11" % "1.4.0"
[tech.navicore/akkaeventhubs_2.11 "1.4.0"]

Dependencies

compile (8)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.11.12
com.microsoft.azure : azure-eventhubs jar 3.0.0
ch.qos.logback : logback-classic jar 1.2.3
com.typesafe : config jar 1.3.4
com.typesafe.scala-logging : scala-logging_2.11 jar 3.9.2
com.typesafe.akka : akka-actor_2.11 jar 2.5.25
com.typesafe.akka : akka-stream_2.11 jar 2.5.25
com.typesafe.akka : akka-persistence_2.11 jar 2.5.25

test (1)

Group / Artifact Type Version
org.scalatest : scalatest_2.11 jar 3.0.8

Project Modules

There are no modules declared in this project.

Build Status Codacy Badge

Akka Eventhubs

Akka Streams Azure Eventhubs Source and Sink

USAGE

update your build.sbt dependencies with:

// https://mvnrepository.com/artifact/tech.navicore/akkaeventhubs
libraryDependencies += "tech.navicore" %% "akkaeventhubs" % "1.4.1"

SOURCE

add to application.conf

eventhubs {

  dispatcher {
    type = Dispatcher
    executor = "thread-pool-executor"
    thread-pool-executor {
      core-pool-size-min = 4
      core-pool-size-factor = 2.0
      core-pool-size-max = 8
    }
    throughput = 10
    mailbox-capacity = -1
    mailbox-type = ""
  }

}

eventhubs-in {

  snapshotInterval = 100

  persist = false

  persistFreq = 1

  offsetPersistenceId = "my_example_eventhubsOffset"

  connection {

    connStr = ${EVENTHUBS_1_CONNSTR}

    partitions = ${EVENTHUBS_1_PARTITION_COUNT}

    defaultOffset = "LATEST"

    consumerGroup = "$Default"

    receiverTimeout = 120s

    receiverBatchSize = 1

    readersPerPartition = 1
  
  }

  dispatcher {
    type = Dispatcher
    executor = "thread-pool-executor"
    thread-pool-executor {
      core-pool-size-min = 4
      core-pool-size-factor = 2.0
      core-pool-size-max = 8
    }
    throughput = 10
    mailbox-capacity = -1
    mailbox-type = ""
  }

}

ack the the item once processed for a partition source:

    val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")

    val source1 = createPartitionSource(0, cfg)

    source1.runForeach(m => {
        println(s"SINGLE SOURCE: ${m._1.substring(0, 160)}")
        m._2.ack()
    })

ack the the item once processed after merging all the partition sources:

    val consumer: Sink[(String, AckableOffset), Future[Done]] =
        Sink.foreach(m => {
            println(s"SUPER SOURCE: ${m._1.substring(0, 160)}")
            m._2.ack()
        })

    val toConsumer = createToConsumer(consumer)

    val cfg: Config = ConfigFactory.load().getConfig("eventhubs-in")

    for (pid <- 0 until  EventHubConf(cfg).partitions) {

        val src: Source[(String, AckableOffset), NotUsed] =
          createPartitionSource(pid, cfg)

        src.runWith(toConsumer)

    }

With Persistence of Offsets

change applicagtion.conf and configure Actor Persistence

eventhubs-in {
  persist = true
...
...
...

SINK

The sink requires a stream shape using a case class

case class EventhubsSinkData(payload: Array[Byte],
                             keyOpt: Option[String] = None,
                             props: Option[Map[String, String]] = None,
                             ackable: Option[AckableOffset] = None,
                             genericAck: Option[() => Unit] = None)
  • payload is what you think it is.
  • keyOpt is the partition key. If not set, the Sink will use a hash of the payload.
  • props is an optional string map that will add properties to the Eventhubs metadata for this item.
  • ackable is optional and will be committed when the payload is successfully sent.
  • genericAck is an optional function and will be called when the payload is successfully sent.
    val outConfig: Config = ConfigFactory.load().getConfig("eventhubs-out")
...
...
...
      val format = Flow[(String, AckableOffset)].map((x: (String, AckableOffset)) =>
        EventhubsSinkData(x._1.getBytes("UTF8"), None, None, Some(x._2))
      )

      src.via(<SOME_PROCESSING_FLOW>).via(format).runWith(new EventhubsSink(EventHubConf(outConfig)))

OPS

publish local

sbt +publishLocalSigned

publish to nexus staging

export GPG_TTY=$(tty)
sbt +publishSigned
sbt sonatypeReleaseAll

Versions

Version
1.4.0
1.3.2
1.3.1
1.3.0
1.2.1
1.2.0
1.1.0
1.0.1
1.0.0
0.9.6
0.9.5
0.9.4
0.9.3
0.9.2
0.9.1
0.9.0
0.1.18
0.1.17
0.1.13
0.1.11
0.1.10
0.1.9
0.1.8
0.1.7
0.1.6
0.1.5