springboot整合mqtt实现消息订阅和推送功能
作者:洛阳泰山
mica-mqtt-client-spring-boot-starter是一个方便、高效、可靠的MQTT客户端启动器,适用于需要使用MQTT协议进行消息通信的Spring Boot应用程序,这篇文章主要介绍了springboot整合mqtt实现消息订阅和推送功能,需要的朋友可以参考下
前言
mica-mqtt-client-spring-boot-starter是一个基于Spring Boot的MQTT客户端启动器,它集成了mica-mqtt客户端,提供了在Spring Boot应用程序中使用MQTT协议进行消息通信的能力。以下是关于mica-mqtt-client-spring-boot-starter的简介:
特点:
- 简单易用:通过Spring Boot的自动配置,可以轻松地集成到Spring应用程序中,并使用Spring的注解或Java配置进行MQTT客户端的配置。
- 低延迟:支持MQTT协议,能够实现实时消息通信,具有较低的延迟。
- 高性能:基于mica-mqtt客户端,具有高效的消息处理和网络通信能力,能够处理大量的并发连接和消息。
- 集群支持:支持基于Redis的发布/订阅模式的集群,可以实现多个节点之间的消息同步和负载均衡。
- 使用场景:适用于需要使用MQTT协议进行消息通信的物联网、实时应用、移动应用等领域。可以在云端或边缘端使用,实现设备与设备之间、设备与服务器之间的消息通信。
- 集成方式:通过在Spring Boot项目中添加相关依赖,并配置MQTT客户端的相关参数,即可快速集成mica-mqtt-client-spring-boot-starter。具体的使用方法可以参考官方文档和示例代码。
- 注意事项:在使用过程中需要注意确保网络连接的稳定性和安全性,并根据实际需求进行适当的配置和优化。同时,也需要关注数据安全和隐私保护等方面的问题。
总之,mica-mqtt-client-spring-boot-starter是一个方便、高效、可靠的MQTT客户端启动器,适用于需要使用MQTT协议进行消息通信的Spring Boot应用程序。
功能
- 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。
- 支持 websocket mqtt 子协议(支持 mqtt.js)。
- 支持 http rest api,http api 文档详见[1]。
- 支持 MQTT client 客户端。
- 支持 MQTT server 服务端。
- 支持 MQTT client、server 共享订阅支持(捐助VIP版采用 topic 树存储,跟 topic 数无关,百万 topic 性能依旧)。
- 支持 MQTT 遗嘱消息。
- 支持 MQTT 保留消息。
- 支持自定义消息(mq)处理转发实现集群。
- MQTT 客户端 阿里云 mqtt 连接 demo。
- 支持 GraalVM 编译成本机可执行程序。
- 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。
- mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。
- 基于 redis pub/sub 实现集群,详见 mica-mqtt-broker 模块[2]
教程
添加依赖
在springboot项目中添加maven依赖:
<!-- https://mvnrepository.com/artifact/net.dreamlu/mica-mqtt-client-spring-boot-starter --> <dependency> <groupId>net.dreamlu</groupId> <artifactId>mica-mqtt-client-spring-boot-starter</artifactId> <version>2.2.8</version> </dependency>
配置参数
在spring配置文件中配置mqtt相关参数,配置如下:
mqtt: server: enabled: false # 是否开启服务端,默认:false client: enabled: true # 是否开启客户端,默认:false ip: 172.16.10.203 # 连接的服务端 ip ,默认:127.0.0.1 port: 1883 # 端口:默认:1883 name: Mica2-Mqtt2-Client # 名称,默认:Mica-Mqtt-Client clientId: coalface_safety_3d # 客户端Id(非常重要,一般为设备 sn,不可重复) user-name: admin # 认证的用户名 你的用户名 password: 3@!cHy@j # 认证的密码 timeout: 5 # 连接超时时间,单位:秒,默认:5秒 reconnect: true # 是否重连,默认:true re-interval: 5000 # 重连时间,默认 5000 毫秒 version: MQTT_3_1 # mqtt 协议版本,默认:3.1.1 read-buffer-size: 8092 # 接收数据的 buffer size,默认:8092 max-bytes-in-message: 8092 # 消息解析最大 bytes 长度,默认:8092 buffer-allocator: heap # 堆内存和堆外内存,默认:堆内存 keep-alive-secs: 60 # keep-alive 心跳维持时间,单位:秒 clean-session: false # mqtt clean session,默认:true will-message: # 消息遗嘱 qos: at_least_once ssl: enabled: false # 是否开启 ssl 认证,2.1.0 开始支持双向认证 keystore-path: # 可选参数:ssl 双向认证 keystore 目录,支持 classpath:/ 路径。 keystore-pass: # 可选参数:ssl 双向认证 keystore 密码 truststore-path: # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。 truststore-pass: # 可选参数:ssl 双向认证 truststore 密码
注意:ssl 存在三种情况
服务端开启ssl | 客户端 |
---|---|
ClientAuth 为 NONE(不需要客户端验证) | 仅仅需要开启 ssl 即可不用配置证书 |
ClientAuth 为 OPTIONAL(与客户端协商) | 需开启 ssl 并且配置 truststore 证书 |
ClientAuth 为 REQUIRE (必须的客户端验证) | 需开启 ssl 并且配置 truststore、 keystore证书 |
创建订阅
创建一个mqtt订阅消息监听类,例如SimulationSubscriber,代码如下:
import com.alibaba.fastjson.JSONObject; import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import org.tio.utils.buffer.ByteBufferUtil; /** * @author tarzan */ @Component @Slf4j public class SimulationSubscriber { @MqttClientSubscribe("tuoyuan/publish/zj/#") public void zjOne(String topic, byte[] payload){ String[] strs=topic.split("/"); String ID=strs[strs.length-1]; log.info("topic:{} payload:{} ID:{}", topic, new String(payload, StandardCharsets.UTF_8),ID); } @MqttClientSubscribe("/sys/${deviceName}/thing/sub/register") public void thingSubRegister(String topic, byte[] payload) { // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 + // 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。 logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); } @MqttClientSubscribe("/tianma/publish/cmj") public void cmj(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/zj") public void zj(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/gbj") public void gbj(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/ltl") public void ltl(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/ntl") public void ntl(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/ccl") public void ccl(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } }
- @Header(“topic”) 和@Payload 注解可以省略
- tuoyuan/publish/zj/# 中的# 是通配符
- 在MQTT协议中,#是一个通配符,代表匹配该主题的所有子主题。例如,如果你订阅了主题sports/baseball/#,那么你将接收到所有以sports/baseball/开头的主题的消息。
- 请注意,通配符#只能用于多层的主题名称中,并且只能用于最后一个级别。例如,sports/baseball/#是有效的,但#sports/baseball或sports/#/baseball都是无效的。
- 除了#之外,MQTT协议还支持一个单层通配符+,它代表只匹配该级别的主题。例如,如果你订阅了主题sports/baseball/+,那么你将只接收到以sports/baseball/开头,且后面跟着至少一个字符的主题的消息。
- 请注意,使用通配符时需要谨慎,因为它们可能会匹配到意外的主题。确保你的订阅主题明确,并且只匹配你感兴趣的主题。
- /sys/${deviceName}/thing/sub/register
- 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
- 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
创建发布
创建一个mqtt消息发布接口类,例如 MqttTestController,代码如下:
import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate; import org.springblade.core.secure.annotation.NoToken; import org.springblade.core.tool.api.R; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.nio.charset.StandardCharsets; /** * @author tarzan */ @RestController @Api(tags = "mqtt测试") @NoToken @RequestMapping("/mqtt") @AllArgsConstructor @Slf4j public class MqttTestController { private final MqttClientTemplate mqttClientTemplate; @ApiOperation(value = "消息发送") @PostMapping("/publish") private R<Boolean> publish(String topic, String msg) { return R.status(mqttClientTemplate.publish(topic, msg.getBytes(StandardCharsets.UTF_8))); } }
接口测试
接口调用
控制台输出
到此这篇关于springboot整合mqtt实现消息订阅和推送的文章就介绍到这了,更多相关springboot整合mqtt内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!