rocketmq-wrapper

https://github.com/zxgangandy/rocketmq-wrapper

License

License

GroupId

GroupId

io.github.zxgangandy
ArtifactId

ArtifactId

rocketmq-wrapper
Last Version

Last Version

1.1.1
Release Date

Release Date

Type

Type

pom
Description

Description

rocketmq-wrapper
https://github.com/zxgangandy/rocketmq-wrapper
Project URL

Project URL

https://github.com/zxgangandy/rocketmq-wrapper
Source Code Management

Source Code Management

https://github.com/zxgangandy/rocketmq-wrapper

Download rocketmq-wrapper

How to add to project

<!-- https://jarcasting.com/artifacts/io.github.zxgangandy/rocketmq-wrapper/ -->
<dependency>
    <groupId>io.github.zxgangandy</groupId>
    <artifactId>rocketmq-wrapper</artifactId>
    <version>1.1.1</version>
    <type>pom</type>
</dependency>
// https://jarcasting.com/artifacts/io.github.zxgangandy/rocketmq-wrapper/
implementation 'io.github.zxgangandy:rocketmq-wrapper:1.1.1'
// https://jarcasting.com/artifacts/io.github.zxgangandy/rocketmq-wrapper/
implementation ("io.github.zxgangandy:rocketmq-wrapper:1.1.1")
'io.github.zxgangandy:rocketmq-wrapper:pom:1.1.1'
<dependency org="io.github.zxgangandy" name="rocketmq-wrapper" rev="1.1.1">
  <artifact name="rocketmq-wrapper" type="pom" />
</dependency>
@Grapes(
@Grab(group='io.github.zxgangandy', module='rocketmq-wrapper', version='1.1.1')
)
libraryDependencies += "io.github.zxgangandy" % "rocketmq-wrapper" % "1.1.1"
[io.github.zxgangandy/rocketmq-wrapper "1.1.1"]

Dependencies

There are no dependencies for this project. It is a standalone project that does not depend on any other jars.

Project Modules

  • rocketmq-wrapper-core

rocketmq-wrapper

AUR

简介

Rocketmq-wrapper是对rocketmq client library的二次封装,支持普通消息和事务消息的发送和处理。Rocketmq-wrapper能大大方便我们使用rocketmq client来来构建应用程序,而忽略一些细节上的事件。

  • 支持同步消息发送
  • 支持异步消息发送
  • 支持事务消息发送

使用

引入library:

<dependency>
  <groupId>io.github.zxgangandy</groupId>
  <artifactId>rocketmq-wrapper-core</artifactId>
  <version>1.1.1</version>
</dependency>

消息生产者例子:

private RMProducer producer;

    @Before
    public void init() {
        producer = RMWrapper.with(RMProducer.class)
                .producerGroup("producer-test")
                .nameSrvAddr("127.0.0.1:9876")
                .topic("test1").retryTimes(3)
                .txListener(new TxListener())
                .start();
    }

    //同步消息
    @Test
        public void sendMsgSync() {
            try {
                SendResult sendResult = producer.sendSync("test", new MessageBody().setContent("a"));
                System.out.println("sendMsgSync, sendResult=" +sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    //异步消息
    @Test
        public void sendMsgAsync() {
            try {
                producer.sendAsync("test", new MessageBody().setContent("b"), new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("sendMsgAsync, sendResult=" +sendResult);
                    }
    
                    @Override
                    public void onException(Throwable e) {
                        System.out.println("sendMsgAsync, e=" +e);
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    //事务消息
    @Test
        public void sendTxMsg() {
            try {
                SendResult sendResult = producer.sendTransactional("test", new MessageBody().setContent("c"), "d");
                System.out.println("sendTxMsg, sendResult=" +sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
  • 事务消息需要实现TransactionListener接口,在使用rocketmq-wrapper的时候只需要继承AbstractTransactionListener即可;

消息发送端例子

RMWrapper.with(RMConsumer.class)
    .consumerGroup("consumer-test")
    .nameSrvAddr("127.0.0.1:9876")
    .topic("test")
    .concurrentlyMessageProcessor(new ConcurrentlyMessageProcessor<MessageBody>() {
        @Override
        public ConsumeConcurrentlyStatus process(MessageExt rawMsg, MessageBody messageBody) {
           System.out.println("messageBody=" + messageBody);
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    })
    .start();
  

自定义消息序列化工具

  • 用户也可以根据自己的喜好和业务要求定制自己的消息序列化工具,只需要实现MessageConverter接口

消息幂等

向多个集群发送消息

消息动态topic消费

注意事项

Versions

Version
1.1.1
1.1.0
1.0.3