storm-rabbitmq

Storm bolt and spout for RabbitMQ

License

License

Categories

Categories

Net ORM Data
GroupId

GroupId

net.syberia.storm
ArtifactId

ArtifactId

storm-rabbitmq
Last Version

Last Version

0.2.20
Release Date

Release Date

Type

Type

jar
Description

Description

storm-rabbitmq
Storm bolt and spout for RabbitMQ
Project URL

Project URL

https://github.com/anderelate/storm-rabbitmq
Source Code Management

Source Code Management

https://github.com/anderelate/storm-rabbitmq

Download storm-rabbitmq

How to add to project

<!-- https://jarcasting.com/artifacts/net.syberia.storm/storm-rabbitmq/ -->
<dependency>
    <groupId>net.syberia.storm</groupId>
    <artifactId>storm-rabbitmq</artifactId>
    <version>0.2.20</version>
</dependency>
// https://jarcasting.com/artifacts/net.syberia.storm/storm-rabbitmq/
implementation 'net.syberia.storm:storm-rabbitmq:0.2.20'
// https://jarcasting.com/artifacts/net.syberia.storm/storm-rabbitmq/
implementation ("net.syberia.storm:storm-rabbitmq:0.2.20")
'net.syberia.storm:storm-rabbitmq:jar:0.2.20'
<dependency org="net.syberia.storm" name="storm-rabbitmq" rev="0.2.20">
  <artifact name="storm-rabbitmq" type="jar" />
</dependency>
@Grapes(
@Grab(group='net.syberia.storm', module='storm-rabbitmq', version='0.2.20')
)
libraryDependencies += "net.syberia.storm" % "storm-rabbitmq" % "0.2.20"
[net.syberia.storm/storm-rabbitmq "0.2.20"]

Dependencies

compile (1)

Group / Artifact Type Version
com.rabbitmq : amqp-client jar 5.1.2

provided (2)

Group / Artifact Type Version
org.apache.storm : storm-core jar 1.2.1
org.projectlombok : lombok jar 1.16.20

test (2)

Group / Artifact Type Version
junit : junit jar 4.12
org.mockito : mockito-core jar 2.15.0

Project Modules

There are no modules declared in this project.

Storm RabbitMQ

Build Status codecov Maven Version

Provides implementations of IRichSpout and IRichBolt for RabbitMQ.

Maven

<dependency>
    <groupId>ru.burov4j.storm</groupId>
    <artifactId>storm-rabbitmq</artifactId>
    <version>1.0.1</version>
</dependency>

Gradle

compile 'ru.burov4j.storm:storm-rabbitmq:1.0.1'

RabbitMQ Connection

You can set RabbitMQ connection properties using RabbitMqConfigBuilder:

RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder()
                .setAddresses("localhost:5672")
                .setUsername("guest")
                .setPassword("guest")
                .setRequestedHeartbeat(60)
                .setVirtualHost("/")
                .build();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(rabbitMqConfig, scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue");

The same with Storm's API:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue")
       .addConfiguration(RabbitMqConfig.KEY_ADDRESSES, "localhost:5672")
       .addConfiguration(RabbitMqConfig.KEY_USERNAME, "guest")
       .addConfiguration(RabbitMqConfig.KEY_PASSWORD, "guest")
       .addConfiguration(RabbitMqConfig.KEY_REQUESTED_HEARTBEAT, 60)
       .addConfiguration(RabbitMqConfig.KEY_VIRTUAL_HOST, "/");

It is not required to set all of properties: for example, you can set only RabbitMQ address. In the case another properties will set as defaults:

RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder()
                .setAddresses("localhost:5672")
                .build();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(rabbitMqConfig, scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue");

RabbitMQ Spout

RabbitMqSpout deserializes input messages and then sends it in your Storm's topology. For using the class you should implement RabbitMqMessageScheme interface:

class MyRabbitMqMessageScheme implements RabbitMqMessageScheme {

    @Override
    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    }

    @Override
    public StreamedTuple convertToStreamedTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws ConvertionException {
        // your implementation here
    }

    @Override
    public Map<String, Fields> getStreamsOutputFields() {
        // your implementation here
    }

    @Override
    public void cleanup() {
        // your implementation here
    }
}

If you want to use only one output stream you can extends SingleStreamRabbitMqMessageScheme:

class MyRabbitMqMessageScheme extends SingleStreamRabbitMqMessageScheme {

    @Override
    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    }
                
    @Override
    public List<Object> convertToTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws ConvertionException {
        // your implementation here
    }
    
    @Override
    public Fields getOutputFields() {
        // your implementation here
    }
    
    @Override
    public void cleanup() {
        // your implementation here
    }
}

The next step is to pass your custom scheme to RabbitMqSpout:

MyRabbitMqMessageScheme scheme = new MyRabbitMqMessageScheme();
RabbitMqSpout rabbitMqSpout = new RabbitMqSpout(scheme);

You can also set some properties for RabbitMqSpout:

builder.setSpout("rabbitmq-spout", rabbitMqSpout)
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue") // required
       .addConfiguration(RabbitMqSpout.KEY_AUTO_ACK, false)
       .addConfiguration(RabbitMqSpout.KEY_PREFETCH_COUNT, 64)
       .addConfiguration(RabbitMqSpout.KEY_REQUEUE_ON_FAIL, false);

Note that the property RabbitMqSpout.KEY_QUEUE_NAME is required.

To do some preparation logic you can implement RabbitMqInitializer interface:

class MyRabbitMqInitializer implements RabbitMqInitializer {
    
    @Override
    public void initialize(Channel channel) throws IOException {
        // your implementation here
    }
}

and then put it in your spout:

RabbitMqInitializer myRabbitMqInitializer = new MyRabbitMqInitializer();
rabbitMqSpout.setInitializer(myRabbitMqInitializer);

RabbitMQ Bolt

If you want to send messages from your Storm's topology to RabbitMQ, you can use RabbitMqBolt. In the case you should implement TupleToRabbitMqMessageConverter interface:

class MyTupleToRabbitMqMessageConverter implements TupleToRabbitMqMessageConverter {

    @Override
    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    }

    @Override
    public String getExchange(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public String getRoutingKey(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public AMQP.BasicProperties getProperties(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public byte[] getMessageBody(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public void cleanup() {
        // your implementation here
    }
}

The next step is to pass your custom converter to RabbitMqBolt:

MyTupleToRabbitMqMessageConverter converter = new MyTupleToRabbitMqMessageConverter();
RabbitMqBolt rabbitMqBolt = new RabbitMqBolt(converter);

You can also set some properties for RabbitMqBolt:

builder.setBolt("rabbitmq-bolt", rabbitMqBolt)
       .addConfiguration(RabbitMqBolt.KEY_MANDATORY, false)
       .addConfiguration(RabbitMqBolt.KEY_IMMEDIATE, false);

You can read more information about RabbitMQ properties here: https://www.rabbitmq.com/amqp-0-9-1-reference.html

Versions

Version
0.2.20
0.2.19
0.2.15
0.2.14
0.2.13
0.2.12
0.2.11
0.2.10
0.2.9
0.2.8
0.2.7
0.2.6
0.2.5
0.2.4
0.2.3
0.2.2
0.2.1
0.2.0
0.1.3
0.1.2
0.1.1
0.1.0