gfc-aws-kinesis


License

License

Apache-style
Categories

Categories

AWS Container PaaS Providers
GroupId

GroupId

com.gilt
ArtifactId

ArtifactId

gfc-aws-kinesis_2.11
Last Version

Last Version

0.17.3-RC1
Release Date

Release Date

Type

Type

jar
Description

Description

gfc-aws-kinesis
gfc-aws-kinesis
Project Organization

Project Organization

com.gilt

Download gfc-aws-kinesis_2.11

How to add to project

<!-- https://jarcasting.com/artifacts/com.gilt/gfc-aws-kinesis_2.11/ -->
<dependency>
    <groupId>com.gilt</groupId>
    <artifactId>gfc-aws-kinesis_2.11</artifactId>
    <version>0.17.3-RC1</version>
</dependency>
// https://jarcasting.com/artifacts/com.gilt/gfc-aws-kinesis_2.11/
implementation 'com.gilt:gfc-aws-kinesis_2.11:0.17.3-RC1'
// https://jarcasting.com/artifacts/com.gilt/gfc-aws-kinesis_2.11/
implementation ("com.gilt:gfc-aws-kinesis_2.11:0.17.3-RC1")
'com.gilt:gfc-aws-kinesis_2.11:jar:0.17.3-RC1'
<dependency org="com.gilt" name="gfc-aws-kinesis_2.11" rev="0.17.3-RC1">
  <artifact name="gfc-aws-kinesis_2.11" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.gilt', module='gfc-aws-kinesis_2.11', version='0.17.3-RC1')
)
libraryDependencies += "com.gilt" % "gfc-aws-kinesis_2.11" % "0.17.3-RC1"
[com.gilt/gfc-aws-kinesis_2.11 "0.17.3-RC1"]

Dependencies

compile (7)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.11.12
com.gilt : gfc-util_2.11 jar 0.2.2
com.gilt : gfc-logging_2.11 jar 0.0.8
com.gilt : gfc-concurrent_2.11 jar 0.3.8
com.amazonaws : aws-java-sdk-kinesis jar 1.11.333
com.amazonaws : amazon-kinesis-client jar 1.9.0
com.amazonaws : dynamodb-streams-kinesis-adapter jar 1.2.2

test (1)

Group / Artifact Type Version
org.specs2 : specs2-scalacheck_2.11 jar 4.0.3

Project Modules

There are no modules declared in this project.

gfc-aws-kinesis Maven Central Join the chat at https://gitter.im/gilt/gfc

Scala wrapper around AWS Kinesis Client Library. Part of the Gilt Foundation Classes.

Getting gfc-aws-kinesis

The latest version is 0.17.0, which is cross-built against Scala 2.11.x and 2.12.x.

SBT dependency:

libraryDependencies += "com.gilt" %% "gfc-aws-kinesis" % "0.17.0"

SBT Akka stream (2.5.x) dependency:

libraryDependencies += "com.gilt" %% "gfc-aws-kinesis-akka" % "0.17.0"

Basic usage

Consume events:

  implicit object StringRecordReader extends KinesisRecordReader[String]{
    override def apply(r: Record) : String = new String(r.getData.array(), "UTF-8")
  }

  val config = KCLConfiguration("consumer-name", "kinesis-stream-name")

  KCLWorkerRunner(config).runAsyncSingleRecordProcessor[String](1 minute) { a: String =>
     // .. do something with A
     Future.successful(())
  }

Publish events:

  implicit object StringRecordWriter extends KinesisRecordWriter[String] {
    override def toKinesisRecord(a: String) : KinesisRecord = {
      KinesisRecord("partition-key", a.getBytes("UTF-8"))
    }
  }

  val publisher = KinesisPublisher()

  val messages = Seq("Hello World!", "foo bar", "baz bam")

  val result: Future[KinesisPublisherBatchResult] = publisher.publishBatch("kinesis-stream-name", messages)

DynamoDB streaming

Create the adapter client

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
    new AmazonDynamoDBStreamsAdapterClient()

Pass the adapter client in the configuration

val streamSource = {
    val streamConfig = KinesisStreamConsumerConfig[Option[A]](
      applicationName,
      config.stream,
      regionName = Some(config.region),
      checkPointInterval = config.checkpointInterval,
      initialPositionInStream = config.streamPosition,
      dynamoDBKinesisAdapterClient = streamAdapterClient
    )
    KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
  }

Pass an implicit kinesis record reader suitable for dynamodb events

implicit val kinesisRecordReader
      : KinesisRecordReader[Option[A]] =
      new KinesisRecordReader[Option[A]] {
        override def apply(record: Record): Option[A] = {
          record match {
            case recordAdapter: RecordAdapter =>
              val dynamoRecord: DynamoRecord =
                recordAdapter.getInternalObject
              dynamoRecord.getEventName match {
                case "INSERT" =>
                  ScanamoFree
                    .read[A](
                      dynamoRecord.getDynamodb.getNewImage)
                    .toOption
                case _ => None
              }
            case _ => None
          }
        }
      }

Consume e.g. using a sink

val targetSink = Sink.actorRefWithAck(target, startMsg, ackMsg, Done)

streamSource
  .filter(!_.isEmpty)
  .map(_.get)
  .log(applicationName)(log)
  .runWith(targetSink)
com.gilt

Gilt Tech

Versions

Version
0.17.3-RC1
0.17.1
0.17.0
0.16.2
0.16.1
0.16.0
0.15.1
0.15.0
0.14.3
0.14.2
0.14.1
0.14.0
0.13.0
0.12.1
0.12.0
0.11.1
0.11.0
0.10.3
0.10.2
0.10.1
0.10.0
0.9.0
0.8.0
0.7.0
0.5.0
0.4.0
0.3.2
0.3.1
0.3.0
0.2.0
0.1.0