amqp-akka-streams


License

License

MIT
Categories

Categories

Akka Container Microservices Reactive libraries
GroupId

GroupId

com.holidaycheck
ArtifactId

ArtifactId

amqp-akka-streams_2.12
Last Version

Last Version

2.0.0
Release Date

Release Date

Type

Type

jar
Description

Description

amqp-akka-streams
amqp-akka-streams
Project URL

Project URL

https://github.com/holidaycheck/amqp-akka-streams
Project Organization

Project Organization

com.holidaycheck
Source Code Management

Source Code Management

https://github.com/holidaycheck/amqp-akka-streams

Download amqp-akka-streams_2.12

How to add to project

<!-- https://jarcasting.com/artifacts/com.holidaycheck/amqp-akka-streams_2.12/ -->
<dependency>
    <groupId>com.holidaycheck</groupId>
    <artifactId>amqp-akka-streams_2.12</artifactId>
    <version>2.0.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.holidaycheck/amqp-akka-streams_2.12/
implementation 'com.holidaycheck:amqp-akka-streams_2.12:2.0.0'
// https://jarcasting.com/artifacts/com.holidaycheck/amqp-akka-streams_2.12/
implementation ("com.holidaycheck:amqp-akka-streams_2.12:2.0.0")
'com.holidaycheck:amqp-akka-streams_2.12:jar:2.0.0'
<dependency org="com.holidaycheck" name="amqp-akka-streams_2.12" rev="2.0.0">
  <artifact name="amqp-akka-streams_2.12" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.holidaycheck', module='amqp-akka-streams_2.12', version='2.0.0')
)
libraryDependencies += "com.holidaycheck" % "amqp-akka-streams_2.12" % "2.0.0"
[com.holidaycheck/amqp-akka-streams_2.12 "2.0.0"]

Dependencies

compile (4)

Group / Artifact Type Version
com.typesafe.akka : akka-actor_2.12 jar 2.4.19
com.typesafe.akka : akka-stream_2.12 jar 2.4.19
com.rabbitmq : amqp-client jar 4.2.0
org.apache.qpid : qpid-broker jar 6.1.4

test (3)

Group / Artifact Type Version
org.scalatest : scalatest_2.12 jar 3.0.3
org.scalamock : scalamock-scalatest-support_2.12 jar 3.6.0
org.reactivestreams : reactive-streams-tck jar 1.0.0

Project Modules

There are no modules declared in this project.

amqp-akka-streams

This library provides you Sinks and Sources that you can use for producing and consuming messages from AMQP queue. It relies on RabbitMQ Java Client in version 4.x but it should work flawlessly with any broker compliant with AMQP protocol in versions 0-9-1, 0-9 or 0-8.

The utilities are grouped in class of AmqpProducer (for producing) and AmqpConsumer (for consuming) which rely on common implicit dependency of AmqpConnection. Connection class provides you access to resource handling, like ability to check the connection status and to close the connection to the broker.

AmqpProducer

AmqpProducer holds Akka Stream's Sink. It is intended to be used to publish messages to AMQP queue or AMQP exchange. Producer requires implicit parameter of PayloadMarshaller[T] for data serialization and AmqpConnection for access to AMQP broker.

Example usage

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.holidaycheck.streams.amqp.AmqpProducer
import com.holidaycheck.streams.amqp.AmqpConnection

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val stringMarshaller: AmqpProducer.PayloadMarshaller[String] = ByteString(_)

val connectionConfiguration = AmqpConnection.Configuration(
  host = "localhost",
  port = 5672,
  virtualHost = "vh",
  username = "guest",
  password = "guest"
)
implicit val connection: AmqpConnection = AmqpConnection(connectionConfiguration)
val producer = AmqpProducer(AmqpProducer.Configuration.publishToQueue("queue"))

Source.single("test message").runWith(producer.sink).flatMap { _ =>
  connection.shutdown() // closes the connection and cleanups the resources
}

AmqpConsumer

AmqpConsumer holds two objects: Source from which you can read incoming messages and Sink where you can ack them. Acking is necessary in order to inform the message broker that the message is successfully consumed. Consumer requires implicit parameter of PayloadUnmarshaller[T] for data deserialization and AmqpConnection for access to AMQP broker. By default consumer provides Delivery with body of ByteString unless you provide it own instance of unmarshaller.

Example usage

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.holidaycheck.streams.amqp.AmqpConsumer
import com.holidaycheck.streams.amqp.AmqpConnection

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val stringUnmarshaller: AmqpConsumer.PayloadUnmarshaller[String] = _.decodeString("utf-8")

val connectionConfiguration = AmqpConnection.Configuration(
  host = "localhost",
  port = 5672,
  virtualHost = "vh",
  username = "guest",
  password = "guest"
)
implicit val connection: AmqpConnection = AmqpConnection(connectionConfiguration)
val consumer = AmqpConsumer(AmqpConsumer.Configuration("queue"))

consumer.source.map { delivery =>
  println(s"Consuming ${delivery.body}...")
  delivery
}.runWith(consumer.sink).flatMap { _ =>
  connection.shutdown() // closes the connection and cleanups the resources
}

How to add it to your project

The library is available both for Scala 2.11.x and 2.12.x. All you have to do is to add the dependency:

"com.holidaycheck" %% "amqp-akka-streams" % "2.0.0"
com.holidaycheck

HolidayCheck

Versions

Version
2.0.0
1.4.0
1.3.1