fs2-nakadi


License

License

MIT
Categories

Categories

Nakadi Application Layer Libs Messaging
GroupId

GroupId

io.nigo
ArtifactId

ArtifactId

fs2-nakadi_2.12
Last Version

Last Version

0.1.0-M2
Release Date

Release Date

Type

Type

jar
Description

Description

fs2-nakadi
fs2-nakadi
Project URL

Project URL

https://github.com/nigozi/fs2-nakadi
Project Organization

Project Organization

io.nigo
Source Code Management

Source Code Management

https://github.com/nigozi/fs2-nakadi

Download fs2-nakadi_2.12

How to add to project

<!-- https://jarcasting.com/artifacts/io.nigo/fs2-nakadi_2.12/ -->
<dependency>
    <groupId>io.nigo</groupId>
    <artifactId>fs2-nakadi_2.12</artifactId>
    <version>0.1.0-M2</version>
</dependency>
// https://jarcasting.com/artifacts/io.nigo/fs2-nakadi_2.12/
implementation 'io.nigo:fs2-nakadi_2.12:0.1.0-M2'
// https://jarcasting.com/artifacts/io.nigo/fs2-nakadi_2.12/
implementation ("io.nigo:fs2-nakadi_2.12:0.1.0-M2")
'io.nigo:fs2-nakadi_2.12:jar:0.1.0-M2'
<dependency org="io.nigo" name="fs2-nakadi_2.12" rev="0.1.0-M2">
  <artifact name="fs2-nakadi_2.12" type="jar" />
</dependency>
@Grapes(
@Grab(group='io.nigo', module='fs2-nakadi_2.12', version='0.1.0-M2')
)
libraryDependencies += "io.nigo" % "fs2-nakadi_2.12" % "0.1.0-M2"
[io.nigo/fs2-nakadi_2.12 "0.1.0-M2"]

Dependencies

compile (16)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.12.8
org.typelevel : cats-free_2.12 jar 1.6.0
org.typelevel : cats-tagless-macros_2.12 jar 0.1.0
io.circe : circe-core_2.12 jar 0.10.1
io.circe : circe-java8_2.12 jar 0.10.1
io.circe : circe-parser_2.12 jar 0.10.1
io.circe : circe-derivation_2.12 jar 0.10.0-M1
org.http4s : http4s-dsl_2.12 jar 0.20.0-M6
org.http4s : http4s-blaze-client_2.12 jar 0.20.0-M6
org.http4s : http4s-blaze-server_2.12 jar 0.20.0-M6
org.http4s : http4s-core_2.12 jar 0.20.0-M6
org.http4s : http4s-circe_2.12 jar 0.20.0-M6
org.http4s : jawn-fs2_2.12 jar 0.14.2
com.beachape : enumeratum-circe_2.12 jar 1.5.20
com.typesafe.scala-logging : scala-logging_2.12 jar 3.8.0
ch.qos.logback : logback-classic jar 1.1.7

test (2)

Group / Artifact Type Version
org.specs2 : specs2-core_2.12 jar 3.8.9
org.scalatest : scalatest_2.12 jar 3.0.5

Project Modules

There are no modules declared in this project.

fs2-nakadi

fs2-nakadi is Nakadi client for Scala based on FS2.

Under the hood

  • http4s as the underlying http client
  • circe for JSON encoding/decoding
  • fs2 for streaming

Status

Work is still in progress but the basic DSLs are defined.

Installation

libraryDependencies += "io.nigo" %% "fs2-nakadi" % "0.1.0-M2"

Usage

Event Types

There are three main categories of event type defined by Nakadi:

  • Business Event: An event that is part of, or drives a business process, such as a state transition in a customer order.

  • Data Change Event: An event that represents a change to a record or other item, or a new item. Change events are associated with a create, update, delete, or snapshot operation.

  • Undefined Event: A free form category suitable for events that are entirely custom to the producer.

fs2-nakadi provides a simple DSL for dealing with event types:

import cats.effect.IO
import java.net.URI
import fs2.nakadi.client._
import fs2.nakadi.model._
import fs2.nakadi.dsl._

// Define Nakadi setting
implicit val config: NakadiConfig[IO] = NakadiConfig[IO](new URI("<nakadi-uri>"))

// Define EventType
val business = EventType(
    name = EventTypeName("business-data"), 
    owningApplication = "fs2-nakadi", 
    category = Category.Business
)

// Create the EventType
EventTypeClient[IO].create(business)

// Find the EventType
EventTypeClient[IO].get(EventTypeName("business-data"))

Publish Events

You can define your own ADT of the events and simply publish them to the desired EventType using Events DSL:

import cats.effect.IO
import java.util.UUID
import java.time.ZonedDateTime
import fs2.nakadi.model._
import fs2.nakadi.model.Event.Business
import fs2.nakadi.dsl._
import fs2.Stream

// Define Event ADT
case class User(id: UUID, firstName: String, lastName: String, createdAt: ZonedDateTime)

object User {
  import io.circe.{Encoder, Decoder}
  import io.circe.derivation._ 
  
  implicit val userEncoder: Encoder[User] = deriveEncoder(renaming.snakeCase)
  implicit val userDecoder: Decoder[User] = deriveDecoder(renaming.snakeCase)
}

// Define Event
val user: User = User(UUID.randomUUID(), "john", "snow", ZonedDateTime.now()) 
val event: Event[User] = Business(
  data = user,
  metadata = Metadata()
)

val eventClient = EventClient[IO]

// Publish a list of Event
eventClient.publish[User](EventTypeName("user-data"), List(event))


// Publish a Stream
Stream
    .emit(event)
    .repeat
    .through(eventClient.publishStream[User](EventTypeName("user-data")))

Consume Events

fs2-nakadi supports high-level event consumption using subscriptions

import cats.effect.IO
import java.net.URI
import fs2.nakadi.model._
import fs2.nakadi.dsl._
import fs2.Stream

val subClient = SubscriptionClient[IO]

// Create a subscription if doesn't exist
val sub = 
  subClient
    .createIfDoesntExist(
        Subscription(
          owningApplication = "fs2-nakadi", 
          eventTypes = Some(List(EventTypeName("user-data")))
        )
    )
    

// Create event stream
Stream
    .eval(sub)
    .flatMap(s => subClient.eventStream[User](s.id.get, StreamConfig()))

You can also use managedEventStream which receives a callback and applies it to every event:

val callback: EventCallback[User] =
    _.subscriptionEvent.events match {
      case Some(ev) =>
        ev.foreach(e => println(s"Received Event: ${e.data.toString}"))
        true
      case _ => true
    }

Stream
    .eval(sub)
    .flatMap { s =>
      subClient.managedEventStream[User](1)(s.id.get, callback, StreamConfig())
    }

Versions

Version
0.1.0-M2
0.1.0-M1