storm-rabbitmq

Storm bolt and spout for RabbitMQ

License

License

Categories

Categories

ORM Data
GroupId

GroupId

ru.burov4j.storm
ArtifactId

ArtifactId

storm-rabbitmq
Last Version

Last Version

1.0.1
Release Date

Release Date

Type

Type

jar
Description

Description

storm-rabbitmq
Storm bolt and spout for RabbitMQ
Project URL

Project URL

https://github.com/burov4j/storm-rabbitmq
Source Code Management

Source Code Management

https://github.com/burov4j/storm-rabbitmq

Download storm-rabbitmq

How to add to project

<!-- https://jarcasting.com/artifacts/ru.burov4j.storm/storm-rabbitmq/ -->
<dependency>
    <groupId>ru.burov4j.storm</groupId>
    <artifactId>storm-rabbitmq</artifactId>
    <version>1.0.1</version>
</dependency>
// https://jarcasting.com/artifacts/ru.burov4j.storm/storm-rabbitmq/
implementation 'ru.burov4j.storm:storm-rabbitmq:1.0.1'
// https://jarcasting.com/artifacts/ru.burov4j.storm/storm-rabbitmq/
implementation ("ru.burov4j.storm:storm-rabbitmq:1.0.1")
'ru.burov4j.storm:storm-rabbitmq:jar:1.0.1'
<dependency org="ru.burov4j.storm" name="storm-rabbitmq" rev="1.0.1">
  <artifact name="storm-rabbitmq" type="jar" />
</dependency>
@Grapes(
@Grab(group='ru.burov4j.storm', module='storm-rabbitmq', version='1.0.1')
)
libraryDependencies += "ru.burov4j.storm" % "storm-rabbitmq" % "1.0.1"
[ru.burov4j.storm/storm-rabbitmq "1.0.1"]

Dependencies

compile (1)

Group / Artifact Type Version
com.rabbitmq : amqp-client jar 5.6.0

provided (2)

Group / Artifact Type Version
org.apache.storm : storm-core jar 1.2.2
org.projectlombok : lombok jar 1.18.6

test (3)

Group / Artifact Type Version
junit : junit jar 4.12
org.mockito : mockito-core jar 2.25.1
org.testcontainers : testcontainers jar 1.11.1

Project Modules

There are no modules declared in this project.

Storm RabbitMQ

Build Status codecov Maven Version

Provides implementations of IRichSpout and IRichBolt for RabbitMQ.

Maven

<dependency>
    <groupId>ru.burov4j.storm</groupId>
    <artifactId>storm-rabbitmq</artifactId>
    <version>1.0.1</version>
</dependency>

Gradle

compile 'ru.burov4j.storm:storm-rabbitmq:1.0.1'

RabbitMQ Connection

You can set RabbitMQ connection properties using RabbitMqConfigBuilder:

RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder()
                .setAddresses("localhost:5672")
                .setUsername("guest")
                .setPassword("guest")
                .setRequestedHeartbeat(60)
                .setVirtualHost("/")
                .build();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(rabbitMqConfig, scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue");

The same with Storm's API:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue")
       .addConfiguration(RabbitMqConfig.KEY_ADDRESSES, "localhost:5672")
       .addConfiguration(RabbitMqConfig.KEY_USERNAME, "guest")
       .addConfiguration(RabbitMqConfig.KEY_PASSWORD, "guest")
       .addConfiguration(RabbitMqConfig.KEY_REQUESTED_HEARTBEAT, 60)
       .addConfiguration(RabbitMqConfig.KEY_VIRTUAL_HOST, "/");

It is not required to set all of properties: for example, you can set only RabbitMQ address. In the case another properties will set as defaults:

RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder()
                .setAddresses("localhost:5672")
                .build();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(rabbitMqConfig, scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue");

RabbitMQ Spout

RabbitMqSpout deserializes input messages and then sends it in your Storm's topology. For using the class you should implement RabbitMqMessageScheme interface:

class MyRabbitMqMessageScheme implements RabbitMqMessageScheme {

    @Override
    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    }

    @Override
    public StreamedTuple convertToStreamedTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws ConvertionException {
        // your implementation here
    }

    @Override
    public Map<String, Fields> getStreamsOutputFields() {
        // your implementation here
    }

    @Override
    public void cleanup() {
        // your implementation here
    }
}

If you want to use only one output stream you can extends SingleStreamRabbitMqMessageScheme:

class MyRabbitMqMessageScheme extends SingleStreamRabbitMqMessageScheme {

    @Override
    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    }
                
    @Override
    public List<Object> convertToTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws ConvertionException {
        // your implementation here
    }
    
    @Override
    public Fields getOutputFields() {
        // your implementation here
    }
    
    @Override
    public void cleanup() {
        // your implementation here
    }
}

The next step is to pass your custom scheme to RabbitMqSpout:

MyRabbitMqMessageScheme scheme = new MyRabbitMqMessageScheme();
RabbitMqSpout rabbitMqSpout = new RabbitMqSpout(scheme);

You can also set some properties for RabbitMqSpout:

builder.setSpout("rabbitmq-spout", rabbitMqSpout)
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue") // required
       .addConfiguration(RabbitMqSpout.KEY_AUTO_ACK, false)
       .addConfiguration(RabbitMqSpout.KEY_PREFETCH_COUNT, 64)
       .addConfiguration(RabbitMqSpout.KEY_REQUEUE_ON_FAIL, false);

Note that the property RabbitMqSpout.KEY_QUEUE_NAME is required.

To do some preparation logic you can implement RabbitMqInitializer interface:

class MyRabbitMqInitializer implements RabbitMqInitializer {
    
    @Override
    public void initialize(Channel channel) throws IOException {
        // your implementation here
    }
}

and then put it in your spout:

RabbitMqInitializer myRabbitMqInitializer = new MyRabbitMqInitializer();
rabbitMqSpout.setInitializer(myRabbitMqInitializer);

RabbitMQ Bolt

If you want to send messages from your Storm's topology to RabbitMQ, you can use RabbitMqBolt. In the case you should implement TupleToRabbitMqMessageConverter interface:

class MyTupleToRabbitMqMessageConverter implements TupleToRabbitMqMessageConverter {

    @Override
    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    }

    @Override
    public String getExchange(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public String getRoutingKey(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public AMQP.BasicProperties getProperties(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public byte[] getMessageBody(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public void cleanup() {
        // your implementation here
    }
}

The next step is to pass your custom converter to RabbitMqBolt:

MyTupleToRabbitMqMessageConverter converter = new MyTupleToRabbitMqMessageConverter();
RabbitMqBolt rabbitMqBolt = new RabbitMqBolt(converter);

You can also set some properties for RabbitMqBolt:

builder.setBolt("rabbitmq-bolt", rabbitMqBolt)
       .addConfiguration(RabbitMqBolt.KEY_MANDATORY, false)
       .addConfiguration(RabbitMqBolt.KEY_IMMEDIATE, false);

You can read more information about RabbitMQ properties here: https://www.rabbitmq.com/amqp-0-9-1-reference.html

Versions

Version
1.0.1
1.0.0