easy-mqtt

easy mqtt

License

License

GroupId

GroupId

com.github.giveme0101
ArtifactId

ArtifactId

easy-mqtt
Last Version

Last Version

1.0.0
Release Date

Release Date

Type

Type

jar
Description

Description

easy-mqtt
easy mqtt

Download easy-mqtt

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.giveme0101/easy-mqtt/ -->
<dependency>
    <groupId>com.github.giveme0101</groupId>
    <artifactId>easy-mqtt</artifactId>
    <version>1.0.0</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.giveme0101/easy-mqtt/
implementation 'com.github.giveme0101:easy-mqtt:1.0.0'
// https://jarcasting.com/artifacts/com.github.giveme0101/easy-mqtt/
implementation ("com.github.giveme0101:easy-mqtt:1.0.0")
'com.github.giveme0101:easy-mqtt:jar:1.0.0'
<dependency org="com.github.giveme0101" name="easy-mqtt" rev="1.0.0">
  <artifact name="easy-mqtt" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.giveme0101', module='easy-mqtt', version='1.0.0')
)
libraryDependencies += "com.github.giveme0101" % "easy-mqtt" % "1.0.0"
[com.github.giveme0101/easy-mqtt "1.0.0"]

Dependencies

compile (9)

Group / Artifact Type Version
junit : junit jar 4.12
org.slf4j : slf4j-api jar 1.7.26
com.alibaba : fastjson jar 1.2.58
org.reflections : reflections jar 0.9.12
org.apache.commons : commons-lang3 jar 3.9
org.apache.commons : commons-collections4 jar 4.3
io.netty : netty jar 3.10.6.Final
org.aspectj : aspectjweaver jar 1.9.4
org.eclipse.paho : org.eclipse.paho.client.mqttv3 jar 1.2.0

runtime (1)

Group / Artifact Type Version
org.slf4j : slf4j-simple jar 1.7.26

Project Modules

There are no modules declared in this project.

MQTT Java 客户端

License

1. mqtt相关介绍及文档

参考文档:
    EMQ X: https://docs.emqx.io/tutorial/v3/cn/
    EMQ X-共享订阅:https://docs.emqx.io/tutorial/v3/cn/advanced/share_subscribe.html
    MQTT 一些概念: https://blog.csdn.net/u013151053/article/details/81366677
    MQTT使用: https://www.cnblogs.com/sxkgeek/p/9140180.html
demo:
    java:easy-mqtt/test目录 和 easy-mqtt-springboot-starter目录
    js: mqtt-with-websocket.html

2. gradle 添加依赖

Maven central 单独使用

      implementation "com.github.giveme0101:easy-mqtt:${easymqtt.version}"

Maven central 集成springboot

      implementation "com.github.giveme0101:easy-mqtt-spring-boot-starter:${easymqttSpringbootStarter.version}"

3. 使用 - 集成 SpringBoot

1. application.yml 配置

      mqtt:
      
        # 生产者,发送端
        producer:
          # client-id不填动态生成
          client-id: producer-1
          server-uri: tcp://127.0.0.1:1883
          username: admin
          password: admin
          # clean-session默认true,每次使用新session
          clean-session: true
          # qos默认值2,仅发送一次
          qos: 2
          # retained默认false,broker不暂存消息
          retained: false
          
        # 消费者,接收端
        consumer:
          
          # 固定client-id, clean-session=false 可以接受离线消息
          # client-id不填动态生成
          client-id: consumer-1
          # clean-session默认true,每次使用新session
          clean-session: false
          
          server-uri: tcp://127.0.0.1:1883
          username: admin
          password: admin
          topics:
              - /test/#
              - /topic/#
              - /message/#
              
          # 一个qos对应一个topic,数量必须相等。qos可以都不定义,不定义使用mqtt默认值:1
          # qos取值 0,1,2
          # qos = 0, 确保至多收到一次消息(只负责传送,发送过后就不管数据的传送情况)
          #     = 1, 确保至少收到一次消息
          #     = 2, 确保只收到一次消息 (使用两阶段提交2pc保证一执行)
          qos:
              - 2
              - 2
              - 2

2. Java 代码

producer:

1. config类添加
    @EnableMqttProducer
2.
public xxxClass{
    @Autowired
    private MqttProducer producer;
    
    ...{
        producer.publish("/topic/1", "message");
        producer.publish("/topic/part/2", new HashMap());
        producer.publish("/topic/3", new Object());
        producer.publish("/cn/qd/news", new Object());
        producer.publish("/cn/qd/weather", new HashMap());
    }
}

consumer:

    
1. 如果配置1)和2)两种方式,则在topic满足条件的情况下都能收到消息
    1) 
        a: 启动类添加@EnableMqttConsumer并将@MqttListener所在的包配置到scanBasePackages:
            @EnableMqttConsumer(scanBasePackages = {"demo.listener"})
        b: 类实现IMqttListener接口并添加@MqttListener并指定一个或多个topic(此topic必须在application.yml中注册),此topic可以模糊匹配,
        如/topic/+/+ , 一个+可以匹配一级,可以匹配/topic/qd/1,不能匹配/topic/x
          /topic/#   , #必须放在结尾,匹配/tocpic/开头的所有topic
    
        @MqttListener(topic = {"/topic/#", "/cn/qd/+"})
        public class TopicListener implements IMqttListener {
            @Override
            publc void onMessage(String topic, MqttMessage message){
                System.out.println("topic name: " + topic + ",message: " + message);
            }
        }  
    
    2)
        a: 启动类添加@EnableMqttConsumer
        b: 类实现IMqttMessageObserver接口并添加@Component等扫描注解,可以收到所有在application.yml中配置的topic消息
        @Component
        public class MqttMessageCenter implements IMqttMessageObserver {
            @Override
            public void onMessage(final String topic, final MqttMessage message) {
                System.out.println("from MqttMessageCenter: " + topic + " - " + message);
            }
        }
 

4. 使用 -- 不集成SpringBoot

详见:
    easy-mqtt测试用例, DynamicMqttClient.java

5. 共享订阅(基于EMQ X的实现)

创建共享订阅的两种方法:

    主题前缀                 实例
    $queue/:topic	        sub $queue/up/data
    $share/:group/:topic	sub $share/group/up/data

共享订阅由三部分组成:

    静态共享订阅标识符($queue 与 $share)
    组标识符(可选)
    特定标准 MQTT 主 题(实际接收消息的主题)

$queue 和 $share 之间的差异:

    $queue 之后的主题中所有消息将轮流发送到客户端,
    $share 之后,您可以添加不同的组,例如:
    $share/group_1/topic
    $share/group_2/topic
    ​$share/group_3/topic
    当 EMQ X 向 topic 发送消息时,每个组都会收到该消息,并依次将其发送到该组中的设备。

共享订阅的使用:

共享订阅仅支持消费者,在yml中的mqtt.consumer.topics的每一个topic前添加 $queue/ 或 $share/group_name/, 注意包含结尾的‘/’, 如:
    $share/group_1/cn/dq/#    ==> cn/dq/#
    $share/group_1//cn/dq/+   ==> /cn/dq/+
    $queue//cn/dq/news        ==> /cn/dq/news

Versions

Version
1.0.0