qmq-spring-boot-starter

Spring Boot with QMQ support,help you simplify QMQ config in Spring Boot.

License

License

Categories

Categories

Spring Boot Container Microservices
GroupId

GroupId

xin.wjtree.qmq
ArtifactId

ArtifactId

qmq-spring-boot-starter
Last Version

Last Version

1.0.2
Release Date

Release Date

Type

Type

jar
Description

Description

qmq-spring-boot-starter
Spring Boot with QMQ support,help you simplify QMQ config in Spring Boot.
Project URL

Project URL

https://github.com/wjtree/qmq-spring-boot-starter
Source Code Management

Source Code Management

https://github.com/wjtree/qmq-spring-boot-starter.git

Download qmq-spring-boot-starter

How to add to project

<!-- https://jarcasting.com/artifacts/xin.wjtree.qmq/qmq-spring-boot-starter/ -->
<dependency>
    <groupId>xin.wjtree.qmq</groupId>
    <artifactId>qmq-spring-boot-starter</artifactId>
    <version>1.0.2</version>
</dependency>
// https://jarcasting.com/artifacts/xin.wjtree.qmq/qmq-spring-boot-starter/
implementation 'xin.wjtree.qmq:qmq-spring-boot-starter:1.0.2'
// https://jarcasting.com/artifacts/xin.wjtree.qmq/qmq-spring-boot-starter/
implementation ("xin.wjtree.qmq:qmq-spring-boot-starter:1.0.2")
'xin.wjtree.qmq:qmq-spring-boot-starter:jar:1.0.2'
<dependency org="xin.wjtree.qmq" name="qmq-spring-boot-starter" rev="1.0.2">
  <artifact name="qmq-spring-boot-starter" type="jar" />
</dependency>
@Grapes(
@Grab(group='xin.wjtree.qmq', module='qmq-spring-boot-starter', version='1.0.2')
)
libraryDependencies += "xin.wjtree.qmq" % "qmq-spring-boot-starter" % "1.0.2"
[xin.wjtree.qmq/qmq-spring-boot-starter "1.0.2"]

Dependencies

compile (3)

Group / Artifact Type Version
org.springframework.boot : spring-boot-configuration-processor Optional jar
org.springframework.boot : spring-boot-autoconfigure jar
com.qunar.qmq : qmq jar 1.1.11

Project Modules

There are no modules declared in this project.

qmq-spring-boot-starter

Maven Central GitHub release GitHub license Build Status Codacy Badge GitHub repo size Gitter

QMQ
Spring Boot Starter for QMQ

引入 Maven 依赖(已上传到中央仓库)

<dependency>
    <groupId>xin.wjtree.qmq</groupId>
    <artifactId>qmq-spring-boot-starter</artifactId>
    <version>1.0.1</version>
</dependency>

添加 Spring Boot 配置(YML)

spring:
  application:
    name: qmq-demo
  qmq:
    # qmq appcode,必填
    app-code: qmq-demo
    # qmq metaserver,必填
    meta-server: http://127.0.0.1:8080/meta/address

    # 生产者配置,发送消息的线程池的设置,选填
    producer:
      # 发送线程数,默认 3
      send-threads: 2
      # 默认每次发送时最大批量大小,默认 30
      send-batch: 30
      # 如果消息发送失败,重试次数,默认 10
      send-try-count: 10
      # 异步发送队列大小,默认 10000
      max-queue-size: 10000

    # 使用 QmqTemplate 发送消息的默认主题,默认值 default_subject
    template:
      default-subject: my_default_sub

    # 消费者配置,消费消息的线程池的设置,选填
    consumer:
      # 线程名称前缀,默认 qmq-process
      thread-name-prefix: my-qmq-process-
      # 线程池大小,默认 2
      core-pool-size: 2
      # 最大线程池大小,默认 2
      max-pool-size: 2
      # 线程池队列大小,默认 1000
      queue-capacity: 1000

    # 消息主题和分组配置,选填
    # 使用 QmqConsumer 注解时,可使用 SpEL 表达式引入以下主题和分组
    subject:
      sub1: qmq_sub1
      sub2: qmq_sub2
      sub3: qmq_sub3
      # more subject ...
    group:
      group1: qmq_group1
      group2: qmq_group2
      group3: qmq_group3
      # more group ...

logging:
  level:
    # 设置 qmq-spring-boot-starter 的日志级别
    xin.wjtree.qmq: trace

server:
  port: 8989

发送消息

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.MessageSendStateListener;
import xin.wjtree.qmq.QmqTemplate;
import xin.wjtree.qmq.constant.QmqTimeUnit;
import xin.wjtree.qmq.internal.QmqAlias;
import xin.wjtree.qmq.internal.QmqIgnore;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest
public class JunitTest {
    @Resource
    private QmqTemplate template;

    @Test
    public void send() throws InterruptedException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 使用链式调用方式
        template
                // 消息主题,如果不填,会使用默认的消息主题
                .subject("sub1")
                // 消息发送状态监听器,此处只是为了 junit 测试使用,如果没有自定义需求,可以不设置
                .listener(new MessageSendStateListener() {
                    @Override
                    public void onSuccess(Message m) {
                        latch.countDown();
                    }

                    @Override
                    public void onFailed(Message m) {
                        latch.countDown();
                    }
                })
                // 延时 10 秒接收消息
                .delay(QmqTimeUnit.TEN_SECONDS)
                // 定时 2019-07-30 00:46:00 接收消息
                //                .delay(LocalDateTime.of(2019, 7, 30, 0, 46, 0))
                // 发送消息,支持 Map 或 Object 实体类
                .send(getUser());

        // 计数器减1
        latch.await();
    }

    public User getUser() {
        User user = new User();
        user.setId(100000000001L);
        user.setName("张三");
        user.setAge(120);
        user.setSchool("北京大学");
        user.setCompany("中石油");
        user.setDuty("行政总裁");
        user.setSalary(new BigDecimal("1000000"));
        user.setEnable(true);
        return user;
    }

    public static class User {
        @QmqAlias("user_id")
        private Long id;

        private String name;

        private Integer age;

        @QmqAlias("school_name")
        private String school;

        private String company;

        @QmqIgnore
        private String duty;

        private BigDecimal salary;

        private Boolean enable;

        public Long getId() {
            return id;
        }

        public void setId(Long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public String getSchool() {
            return school;
        }

        public void setSchool(String school) {
            this.school = school;
        }

        public String getCompany() {
            return company;
        }

        public void setCompany(String company) {
            this.company = company;
        }

        public String getDuty() {
            return duty;
        }

        public void setDuty(String duty) {
            this.duty = duty;
        }

        public BigDecimal getSalary() {
            return salary;
        }

        public void setSalary(BigDecimal salary) {
            this.salary = salary;
        }

        public Boolean getEnable() {
            return enable;
        }

        public void setEnable(Boolean enable) {
            this.enable = enable;
        }
    }
}

消费消息

启用消费者模式

在配置类上添加 EnableQmq 注解,包括 appCode 和 metaServer 属性

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import qunar.tc.qmq.consumer.annotation.EnableQmq;

@EnableQmq(appCode = "${spring.qmq.app-code}", metaServer = "${spring.qmq.meta-server}")
@SpringBootApplication
public class QmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(QmqApplication.class, args);
    }

}

配置消费监听器

在方法上添加 QmqConsumer 注解,包括 subject,consumerGroup,executor 等属性
executor = QmqConstant.EXECUTOR_NAME 表示消费线程池的 BeanName,该值固定为 qmqExecutor

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.consumer.annotation.QmqConsumer;
import xin.wjtree.qmq.constant.QmqHelper;

@Slf4j
@Component
public class QmqLinstener {

    @QmqConsumer(subject = "${spring.qmq.subject.sub1}", consumerGroup = "${spring.qmq.group.group1}",
            executor = QmqHelper.EXECUTOR_NAME)
    public void onMessage(Message message) {
        log.info("QMQ 消费主题:{},消费消息:{}", message.getSubject(), ((BaseMessage) message).getAttrs());
    }

}

Versions

Version
1.0.2
1.0.1
1.0.0