gfc-aws-kinesis


License

License

Apache-style
Categories

Categories

AWS Container PaaS Providers
GroupId

GroupId

com.gilt
ArtifactId

ArtifactId

gfc-aws-kinesis_2.10
Last Version

Last Version

0.9.0
Release Date

Release Date

Type

Type

jar
Description

Description

gfc-aws-kinesis
gfc-aws-kinesis
Project Organization

Project Organization

com.gilt

Download gfc-aws-kinesis_2.10

How to add to project

<!-- https://jarcasting.com/artifacts/com.gilt/gfc-aws-kinesis_2.10/ -->
<dependency>
    <groupId>com.gilt</groupId>
    <artifactId>gfc-aws-kinesis_2.10</artifactId>
    <version>0.9.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.gilt/gfc-aws-kinesis_2.10/
implementation 'com.gilt:gfc-aws-kinesis_2.10:0.9.0'
// https://jarcasting.com/artifacts/com.gilt/gfc-aws-kinesis_2.10/
implementation ("com.gilt:gfc-aws-kinesis_2.10:0.9.0")
'com.gilt:gfc-aws-kinesis_2.10:jar:0.9.0'
<dependency org="com.gilt" name="gfc-aws-kinesis_2.10" rev="0.9.0">
  <artifact name="gfc-aws-kinesis_2.10" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.gilt', module='gfc-aws-kinesis_2.10', version='0.9.0')
)
libraryDependencies += "com.gilt" % "gfc-aws-kinesis_2.10" % "0.9.0"
[com.gilt/gfc-aws-kinesis_2.10 "0.9.0"]

Dependencies

compile (6)

Group / Artifact Type Version
org.scala-lang : scala-library jar 2.10.5
com.gilt : gfc-util_2.10 jar 0.1.1
com.gilt : gfc-logging_2.10 jar 0.0.3
com.gilt : gfc-concurrent_2.10 jar 0.2.0
com.amazonaws : aws-java-sdk-kinesis jar 1.11.18
com.amazonaws : amazon-kinesis-client jar 1.6.4

provided (2)

Group / Artifact Type Version
org.scoverage : scalac-scoverage-runtime_2.10 jar 1.1.0
org.scoverage : scalac-scoverage-plugin_2.10 jar 1.1.0

test (1)

Group / Artifact Type Version
org.specs2 : specs2-scalacheck_2.10 jar 3.6.5

Project Modules

There are no modules declared in this project.

gfc-aws-kinesis Maven Central Join the chat at https://gitter.im/gilt/gfc

Scala wrapper around AWS Kinesis Client Library. Part of the Gilt Foundation Classes.

Getting gfc-aws-kinesis

The latest version is 0.17.0, which is cross-built against Scala 2.11.x and 2.12.x.

SBT dependency:

libraryDependencies += "com.gilt" %% "gfc-aws-kinesis" % "0.17.0"

SBT Akka stream (2.5.x) dependency:

libraryDependencies += "com.gilt" %% "gfc-aws-kinesis-akka" % "0.17.0"

Basic usage

Consume events:

  implicit object StringRecordReader extends KinesisRecordReader[String]{
    override def apply(r: Record) : String = new String(r.getData.array(), "UTF-8")
  }

  val config = KCLConfiguration("consumer-name", "kinesis-stream-name")

  KCLWorkerRunner(config).runAsyncSingleRecordProcessor[String](1 minute) { a: String =>
     // .. do something with A
     Future.successful(())
  }

Publish events:

  implicit object StringRecordWriter extends KinesisRecordWriter[String] {
    override def toKinesisRecord(a: String) : KinesisRecord = {
      KinesisRecord("partition-key", a.getBytes("UTF-8"))
    }
  }

  val publisher = KinesisPublisher()

  val messages = Seq("Hello World!", "foo bar", "baz bam")

  val result: Future[KinesisPublisherBatchResult] = publisher.publishBatch("kinesis-stream-name", messages)

DynamoDB streaming

Create the adapter client

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
    new AmazonDynamoDBStreamsAdapterClient()

Pass the adapter client in the configuration

val streamSource = {
    val streamConfig = KinesisStreamConsumerConfig[Option[A]](
      applicationName,
      config.stream,
      regionName = Some(config.region),
      checkPointInterval = config.checkpointInterval,
      initialPositionInStream = config.streamPosition,
      dynamoDBKinesisAdapterClient = streamAdapterClient
    )
    KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
  }

Pass an implicit kinesis record reader suitable for dynamodb events

implicit val kinesisRecordReader
      : KinesisRecordReader[Option[A]] =
      new KinesisRecordReader[Option[A]] {
        override def apply(record: Record): Option[A] = {
          record match {
            case recordAdapter: RecordAdapter =>
              val dynamoRecord: DynamoRecord =
                recordAdapter.getInternalObject
              dynamoRecord.getEventName match {
                case "INSERT" =>
                  ScanamoFree
                    .read[A](
                      dynamoRecord.getDynamodb.getNewImage)
                    .toOption
                case _ => None
              }
            case _ => None
          }
        }
      }

Consume e.g. using a sink

val targetSink = Sink.actorRefWithAck(target, startMsg, ackMsg, Done)

streamSource
  .filter(!_.isEmpty)
  .map(_.get)
  .log(applicationName)(log)
  .runWith(targetSink)
com.gilt

Gilt Tech

Versions

Version
0.9.0
0.8.0
0.7.0
0.5.0
0.4.0
0.3.2
0.3.1
0.3.0