gfc-aws-kinesis-akka


License

License

Categories

Categories

AWS Container PaaS Providers Akka Microservices Reactive libraries
GroupId

GroupId

org.gfccollective
ArtifactId

ArtifactId

gfc-aws-kinesis-akka_2.13
Last Version

Last Version

1.0.0
Release Date

Release Date

Type

Type

jar
Description

Description

gfc-aws-kinesis-akka
gfc-aws-kinesis-akka
Project URL

Project URL

https://github.com/gfc-collective/gfc-aws-kinesis
Project Organization

Project Organization

org.gfccollective
Source Code Management

Source Code Management

https://github.com/gfc-collective/gfc-aws-kinesis.git

Download gfc-aws-kinesis-akka_2.13

How to add to project

<!-- https://jarcasting.com/artifacts/org.gfccollective/gfc-aws-kinesis-akka_2.13/ -->
<dependency>
    <groupId>org.gfccollective</groupId>
    <artifactId>gfc-aws-kinesis-akka_2.13</artifactId>
    <version>1.0.0</version>
</dependency>
// https://jarcasting.com/artifacts/org.gfccollective/gfc-aws-kinesis-akka_2.13/
implementation 'org.gfccollective:gfc-aws-kinesis-akka_2.13:1.0.0'
// https://jarcasting.com/artifacts/org.gfccollective/gfc-aws-kinesis-akka_2.13/
implementation ("org.gfccollective:gfc-aws-kinesis-akka_2.13:1.0.0")
'org.gfccollective:gfc-aws-kinesis-akka_2.13:jar:1.0.0'
<dependency org="org.gfccollective" name="gfc-aws-kinesis-akka_2.13" rev="1.0.0">
  <artifact name="gfc-aws-kinesis-akka_2.13" type="jar" />
</dependency>
@Grapes(
@Grab(group='org.gfccollective', module='gfc-aws-kinesis-akka_2.13', version='1.0.0')
)
libraryDependencies += "org.gfccollective" % "gfc-aws-kinesis-akka_2.13" % "1.0.0"
[org.gfccollective/gfc-aws-kinesis-akka_2.13 "1.0.0"]

Dependencies

compile (3)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.13.1
org.gfccollective : gfc-aws-kinesis_2.13 jar 1.0.0
com.typesafe.akka : akka-stream_2.13 jar 2.6.1

Project Modules

There are no modules declared in this project.

gfc-aws-kinesis Maven Central Build Status Coverage Status

Scala wrapper around AWS Kinesis Client Library.

A fork and new home of the now unmaintained Gilt Foundation Classes (com.gilt.gfc), now called the GFC Collective, maintained by some of the original authors.

Getting gfc-aws-kinesis

The latest version is 1.0.0, released on 21/Jan/2020 and cross-built against Scala 2.12.x and 2.13.x.

If you're using SBT, add the following line to your build file:

libraryDependencies += "org.gfccollective" %% "gfc-aws-kinesis" % "1.0.0"

SBT Akka stream (2.5.x) dependency:

libraryDependencies += "org.gfccollective" %% "gfc-aws-kinesis-akka" % "1.0.0"

For Maven and other build tools, you can visit search.maven.org. (This search will also list other available libraries from the GFC Collective.)

Basic usage

Consume events:

  implicit object StringRecordReader extends KinesisRecordReader[String]{
    override def apply(r: Record) : String = new String(r.getData.array(), "UTF-8")
  }

  val config = KCLConfiguration("consumer-name", "kinesis-stream-name")

  KCLWorkerRunner(config).runAsyncSingleRecordProcessor[String](1 minute) { a: String =>
     // .. do something with A
     Future.successful(())
  }

Publish events:

  implicit object StringRecordWriter extends KinesisRecordWriter[String] {
    override def toKinesisRecord(a: String) : KinesisRecord = {
      KinesisRecord("partition-key", a.getBytes("UTF-8"))
    }
  }

  val publisher = KinesisPublisher()

  val messages = Seq("Hello World!", "foo bar", "baz bam")

  val result: Future[KinesisPublisherBatchResult] = publisher.publishBatch("kinesis-stream-name", messages)

DynamoDB streaming

Create the adapter client

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
    new AmazonDynamoDBStreamsAdapterClient()

Pass the adapter client in the configuration

val streamSource = {
    val streamConfig = KinesisStreamConsumerConfig[Option[A]](
      applicationName,
      config.stream,
      regionName = Some(config.region),
      checkPointInterval = config.checkpointInterval,
      initialPositionInStream = config.streamPosition,
      dynamoDBKinesisAdapterClient = streamAdapterClient
    )
    KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
  }

Pass an implicit kinesis record reader suitable for dynamodb events

implicit val kinesisRecordReader
      : KinesisRecordReader[Option[A]] =
      new KinesisRecordReader[Option[A]] {
        override def apply(record: Record): Option[A] = {
          record match {
            case recordAdapter: RecordAdapter =>
              val dynamoRecord: DynamoRecord =
                recordAdapter.getInternalObject
              dynamoRecord.getEventName match {
                case "INSERT" =>
                  ScanamoFree
                    .read[A](
                      dynamoRecord.getDynamodb.getNewImage)
                    .toOption
                case _ => None
              }
            case _ => None
          }
        }
      }

Consume e.g. using a sink

val targetSink = Sink.actorRefWithAck(target, startMsg, ackMsg, Done)

streamSource
  .filter(!_.isEmpty)
  .map(_.get)
  .log(applicationName)(log)
  .runWith(targetSink)

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

org.gfccollective

The GFC Collective

The new home of the (former) Gilt Foundation Classes

Versions

Version
1.0.0