gfc-aws-kinesis-akka


License

License

Apache-style
Categories

Categories

AWS Container PaaS Providers Akka Microservices Reactive libraries
GroupId

GroupId

com.gilt
ArtifactId

ArtifactId

gfc-aws-kinesis-akka_2.11
Last Version

Last Version

0.17.3-RC1
Release Date

Release Date

Type

Type

jar
Description

Description

gfc-aws-kinesis-akka
gfc-aws-kinesis-akka
Project Organization

Project Organization

com.gilt

Download gfc-aws-kinesis-akka_2.11

How to add to project

<!-- https://jarcasting.com/artifacts/com.gilt/gfc-aws-kinesis-akka_2.11/ -->
<dependency>
    <groupId>com.gilt</groupId>
    <artifactId>gfc-aws-kinesis-akka_2.11</artifactId>
    <version>0.17.3-RC1</version>
</dependency>
// https://jarcasting.com/artifacts/com.gilt/gfc-aws-kinesis-akka_2.11/
implementation 'com.gilt:gfc-aws-kinesis-akka_2.11:0.17.3-RC1'
// https://jarcasting.com/artifacts/com.gilt/gfc-aws-kinesis-akka_2.11/
implementation ("com.gilt:gfc-aws-kinesis-akka_2.11:0.17.3-RC1")
'com.gilt:gfc-aws-kinesis-akka_2.11:jar:0.17.3-RC1'
<dependency org="com.gilt" name="gfc-aws-kinesis-akka_2.11" rev="0.17.3-RC1">
  <artifact name="gfc-aws-kinesis-akka_2.11" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.gilt', module='gfc-aws-kinesis-akka_2.11', version='0.17.3-RC1')
)
libraryDependencies += "com.gilt" % "gfc-aws-kinesis-akka_2.11" % "0.17.3-RC1"
[com.gilt/gfc-aws-kinesis-akka_2.11 "0.17.3-RC1"]

Dependencies

compile (3)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.11.12
com.gilt : gfc-aws-kinesis_2.11 jar 0.17.3-RC1
com.typesafe.akka : akka-stream_2.11 jar 2.5.6

Project Modules

There are no modules declared in this project.

gfc-aws-kinesis Maven Central Join the chat at https://gitter.im/gilt/gfc

Scala wrapper around AWS Kinesis Client Library. Part of the Gilt Foundation Classes.

Getting gfc-aws-kinesis

The latest version is 0.17.0, which is cross-built against Scala 2.11.x and 2.12.x.

SBT dependency:

libraryDependencies += "com.gilt" %% "gfc-aws-kinesis" % "0.17.0"

SBT Akka stream (2.5.x) dependency:

libraryDependencies += "com.gilt" %% "gfc-aws-kinesis-akka" % "0.17.0"

Basic usage

Consume events:

  implicit object StringRecordReader extends KinesisRecordReader[String]{
    override def apply(r: Record) : String = new String(r.getData.array(), "UTF-8")
  }

  val config = KCLConfiguration("consumer-name", "kinesis-stream-name")

  KCLWorkerRunner(config).runAsyncSingleRecordProcessor[String](1 minute) { a: String =>
     // .. do something with A
     Future.successful(())
  }

Publish events:

  implicit object StringRecordWriter extends KinesisRecordWriter[String] {
    override def toKinesisRecord(a: String) : KinesisRecord = {
      KinesisRecord("partition-key", a.getBytes("UTF-8"))
    }
  }

  val publisher = KinesisPublisher()

  val messages = Seq("Hello World!", "foo bar", "baz bam")

  val result: Future[KinesisPublisherBatchResult] = publisher.publishBatch("kinesis-stream-name", messages)

DynamoDB streaming

Create the adapter client

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
    new AmazonDynamoDBStreamsAdapterClient()

Pass the adapter client in the configuration

val streamSource = {
    val streamConfig = KinesisStreamConsumerConfig[Option[A]](
      applicationName,
      config.stream,
      regionName = Some(config.region),
      checkPointInterval = config.checkpointInterval,
      initialPositionInStream = config.streamPosition,
      dynamoDBKinesisAdapterClient = streamAdapterClient
    )
    KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
  }

Pass an implicit kinesis record reader suitable for dynamodb events

implicit val kinesisRecordReader
      : KinesisRecordReader[Option[A]] =
      new KinesisRecordReader[Option[A]] {
        override def apply(record: Record): Option[A] = {
          record match {
            case recordAdapter: RecordAdapter =>
              val dynamoRecord: DynamoRecord =
                recordAdapter.getInternalObject
              dynamoRecord.getEventName match {
                case "INSERT" =>
                  ScanamoFree
                    .read[A](
                      dynamoRecord.getDynamodb.getNewImage)
                    .toOption
                case _ => None
              }
            case _ => None
          }
        }
      }

Consume e.g. using a sink

val targetSink = Sink.actorRefWithAck(target, startMsg, ackMsg, Done)

streamSource
  .filter(!_.isEmpty)
  .map(_.get)
  .log(applicationName)(log)
  .runWith(targetSink)
com.gilt

Gilt Tech

Versions

Version
0.17.3-RC1
0.17.1
0.17.0
0.16.2
0.16.1
0.16.0
0.15.1
0.15.0
0.14.3
0.14.2
0.14.1
0.14.0
0.13.0
0.12.1
0.12.0
0.11.1
0.11.0
0.10.3
0.10.2
0.10.1
0.10.0