MQTT Java 客户端
1. mqtt相关介绍及文档
参考文档:
EMQ X: https://docs.emqx.io/tutorial/v3/cn/
EMQ X-共享订阅:https://docs.emqx.io/tutorial/v3/cn/advanced/share_subscribe.html
MQTT 一些概念: https://blog.csdn.net/u013151053/article/details/81366677
MQTT使用: https://www.cnblogs.com/sxkgeek/p/9140180.html
demo:
java:easy-mqtt/test目录 和 easy-mqtt-springboot-starter目录
js: mqtt-with-websocket.html
2. gradle 添加依赖
implementation "com.github.giveme0101:easy-mqtt:${easymqtt.version}"
implementation "com.github.giveme0101:easy-mqtt-spring-boot-starter:${easymqttSpringbootStarter.version}"
3. 使用 - 集成 SpringBoot
1. application.yml 配置
mqtt:
# 生产者,发送端
producer:
# client-id不填动态生成
client-id: producer-1
server-uri: tcp://127.0.0.1:1883
username: admin
password: admin
# clean-session默认true,每次使用新session
clean-session: true
# qos默认值2,仅发送一次
qos: 2
# retained默认false,broker不暂存消息
retained: false
# 消费者,接收端
consumer:
# 固定client-id, clean-session=false 可以接受离线消息
# client-id不填动态生成
client-id: consumer-1
# clean-session默认true,每次使用新session
clean-session: false
server-uri: tcp://127.0.0.1:1883
username: admin
password: admin
topics:
- /test/#
- /topic/#
- /message/#
# 一个qos对应一个topic,数量必须相等。qos可以都不定义,不定义使用mqtt默认值:1
# qos取值 0,1,2
# qos = 0, 确保至多收到一次消息(只负责传送,发送过后就不管数据的传送情况)
# = 1, 确保至少收到一次消息
# = 2, 确保只收到一次消息 (使用两阶段提交2pc保证一执行)
qos:
- 2
- 2
- 2
2. Java 代码
producer:
1. config类添加
@EnableMqttProducer
2.
public xxxClass{
@Autowired
private MqttProducer producer;
...{
producer.publish("/topic/1", "message");
producer.publish("/topic/part/2", new HashMap());
producer.publish("/topic/3", new Object());
producer.publish("/cn/qd/news", new Object());
producer.publish("/cn/qd/weather", new HashMap());
}
}
consumer:
1. 如果配置1)和2)两种方式,则在topic满足条件的情况下都能收到消息
1)
a: 启动类添加@EnableMqttConsumer并将@MqttListener所在的包配置到scanBasePackages:
@EnableMqttConsumer(scanBasePackages = {"demo.listener"})
b: 类实现IMqttListener接口并添加@MqttListener并指定一个或多个topic(此topic必须在application.yml中注册),此topic可以模糊匹配,
如/topic/+/+ , 一个+可以匹配一级,可以匹配/topic/qd/1,不能匹配/topic/x
/topic/# , #必须放在结尾,匹配/tocpic/开头的所有topic
@MqttListener(topic = {"/topic/#", "/cn/qd/+"})
public class TopicListener implements IMqttListener {
@Override
publc void onMessage(String topic, MqttMessage message){
System.out.println("topic name: " + topic + ",message: " + message);
}
}
2)
a: 启动类添加@EnableMqttConsumer
b: 类实现IMqttMessageObserver接口并添加@Component等扫描注解,可以收到所有在application.yml中配置的topic消息
@Component
public class MqttMessageCenter implements IMqttMessageObserver {
@Override
public void onMessage(final String topic, final MqttMessage message) {
System.out.println("from MqttMessageCenter: " + topic + " - " + message);
}
}
4. 使用 -- 不集成SpringBoot
详见:
easy-mqtt测试用例, DynamicMqttClient.java
5. 共享订阅(基于EMQ X的实现)
创建共享订阅的两种方法:
主题前缀 实例
$queue/:topic sub $queue/up/data
$share/:group/:topic sub $share/group/up/data
共享订阅由三部分组成:
静态共享订阅标识符($queue 与 $share)
组标识符(可选)
特定标准 MQTT 主 题(实际接收消息的主题)
$queue 和 $share 之间的差异:
$queue 之后的主题中所有消息将轮流发送到客户端,
$share 之后,您可以添加不同的组,例如:
$share/group_1/topic
$share/group_2/topic
$share/group_3/topic
当 EMQ X 向 topic 发送消息时,每个组都会收到该消息,并依次将其发送到该组中的设备。
共享订阅的使用:
共享订阅仅支持消费者,在yml中的mqtt.consumer.topics的每一个topic前添加 $queue/ 或 $share/group_name/, 注意包含结尾的‘/’, 如:
$share/group_1/cn/dq/# ==> cn/dq/#
$share/group_1//cn/dq/+ ==> /cn/dq/+
$queue//cn/dq/news ==> /cn/dq/news