Springboot实现MQTT通信的示例代码
作者:就不告诉你嘿嘿
本文主要介绍了Springboot实现MQTT通信的示例代码,包含了MQTT协议的特点和工作原理等,具有一定的参考价值,感兴趣的可以了解一下
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模型的轻量级消息传输协议,常用于物联网(IoT)场景中。它设计简洁、带宽占用少,非常适合资源受限的设备和网络环境。
一、MQ协议
MQTT 特点
轻量级协议:
- 设计简单,占用带宽少,特别适合嵌入式设备和不稳定的网络环境。
发布/订阅模型:
- 客户端通过主题(Topic)发布消息,订阅者通过主题接收消息,彼此不直接通信。
可靠性保障:
- 提供三种服务质量(QoS)等级,确保消息可靠传输:
- QoS 0:至多一次(不确认,可能丢失)。
- QoS 1:至少一次(需要确认,但可能重复)。
- QoS 2:仅一次(确保消息不丢失且不重复)。
- 提供三种服务质量(QoS)等级,确保消息可靠传输:
持续连接:
- 使用 TCP/IP 连接,通过心跳包(Keep-Alive)保持连接稳定。
支持离线消息:
- 使用“保留消息”和“持久会话”功能,实现离线设备接收消息。
安全性:
- 支持 SSL/TLS 加密,结合用户名和密码进行身份验证。
MQTT 工作原理
连接:
- 客户端通过
CONNECT
消息向服务器建立连接,服务器返回CONNACK
消息。
- 客户端通过
发布:
- 客户端通过
PUBLISH
消息向服务器发布消息,指定消息的主题。
- 客户端通过
订阅:
- 客户端通过
SUBSCRIBE
消息订阅一个或多个主题,服务器将匹配主题的消息推送给客户端。
- 客户端通过
心跳:
- 客户端和服务器定期发送心跳包(PINGREQ 和 PINGRESP),确保连接有效。
断开:
- 客户端通过
DISCONNECT
消息通知服务器主动断开连接。
- 客户端通过
MQTT 主要应用场景
物联网(IoT):
- 设备状态监控、数据收集和远程控制。
智能家居:
- 控制家电、监控传感器数据。
车联网:
- 实时车辆数据传输、位置追踪。
移动应用:
- 消息推送、实时聊天。
工业领域:
- 设备数据采集和分析。
MQTT 配置与注意事项
主题命名:
- 使用层级结构(如
/iot/device/status
),便于管理。 - 避免过于复杂的主题结构。
- 使用层级结构(如
QoS 选择:
- 根据应用需求选择适合的 QoS 等级,平衡可靠性和性能。
安全措施:
- 启用 SSL/TLS 加密。
- 配置用户名和密码,限制匿名连接。
- 控制主题的访问权限。
性能优化:
- 控制消息大小,减少带宽占用。
- 调整心跳时间,优化连接稳定性。
二、MQTT服务器搭建
1、在springboot项目工程pom文件下引入相关依赖
<!--mqtt相关依赖--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2、修改application.yml配置文件
spring: application: name: provider #MQTT配置信息 mqtt: #MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开 url: tcp://127.0.0.1:1883 #用户名 username: guest #密码 password: guest #客户端id(不能重复) client: id: provider-id #MQTT默认的消息推送主题,实际可在调用接口是指定 default: topic: topic server: port: 8080
3、消息发布者客户端配置
package com.three.demo.mqtt.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import java.time.LocalDateTime; @Configuration @Slf4j public class MqttClientConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; /** * 客户端对象 */ private MqttAsyncClient client; /** * 在bean初始化后连接到服务器 */ @PostConstruct public void init() { connect(); } /** * 客户端连接服务端 */ public void connect() { //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接服务器都是以新的身份 options.setCleanSession(false); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(60); //设置心跳时间 单位为秒,表示服务器每隔 1.5*10秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); // 开启自动重连 options.setAutomaticReconnect(true); // 设置最大重连时间间隔 (可选),单位是毫秒,设置为 5000 表示最多等待 5 秒再尝试重连 options.setMaxReconnectDelay(5000); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false); try { //创建MQTT客户端对象 client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence()); //设置回调 client.setCallback(mqttClientCallBack); // 使用异步连接 client.connect(options, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { log.info("MQTT连接成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("MQTT连接失败:" + exception.getMessage()); } }); } catch (MqttException e) { log.error("mqtt连接失败。。" + e.getMessage()); } } public void publish(int qos, boolean retained) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(pushLog.getData().getBytes()); try { // 使用异步客户端发布消息,并处理结果 client.publish(pushLog.getTopic(), mqttMessage, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { System.out.println("发送成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("发送失败:" + exception.getMessage()); } }); } catch (MqttException e) { log.error("发送失败:" + e.getMessage()); } } /** * 断开连接 */ public void disConnect() { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } }
4、消息发布客户端回调
package com.three.demo.mqtt.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttClientCallBack implements MqttCallback { @Value("${spring.mqtt.client.id}") private String clientId; /** * 与服务器断开的回调 */ @Override public void connectionLost(Throwable cause) { log.error(clientId + "与服务器断开连接!!" + cause.getMessage()); } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); System.out.println(client.getClientId()+"发布消息成功!"); } }
5、创建控制器测试发布信息
package com.three.demo.mqtt.controller; import com.three.demo.mqtt.config.MqttClientConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SendController { @Autowired private MqttClientConfig client; @RequestMapping("/sendMessage") @ResponseBody public String sendMessage(int qos,boolean retained,String topic,String message){ try { client.publish(qos, retained, topic, message); return "发送成功"; } catch (Exception e) { e.printStackTrace(); return "发送失败"; } } }
6、消息接收者配置
这里我对之前的代码进行改造
/** * 客户端连接服务端 */ public void connect() { //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接服务器都是以新的身份 options.setCleanSession(false); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(60); //设置心跳时间 单位为秒,表示服务器每隔 1.5*10秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); // 开启自动重连 options.setAutomaticReconnect(true); // 设置最大重连时间间隔 (可选),单位是毫秒,设置为 5000 表示最多等待 5 秒再尝试重连 options.setMaxReconnectDelay(5000); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false); try { //创建MQTT客户端对象 client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence()); //设置回调 client.setCallback(mqttClientCallBack); // 使用异步连接 client.connect(options, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { log.info("MQTT连接成功"); // 连接成功后订阅主题 try { //订阅主题 //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息 int[] qos = {2, 2}; String[] topics = { "/iot/msg/topic1", "/iot/msg/topic2" }; client.subscribe(topics, qos); log.info("订阅主题成功"); } catch (MqttException e) { log.error("订阅主题失败:" + e.getMessage()); } } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("MQTT连接失败:" + exception.getMessage()); } }); } catch (MqttException e) { e.printStackTrace(); log.error("mqtt连接失败。。" + e.getMessage()); } }
然后在消息客户端回调类这里
package com.ruoyi.yyt.mqtt.config; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @Slf4j @Component public class MqttClientCallBack implements MqttCallback { @Value("${spring.mqtt.client.id}") private String clientId; /** * 客户端断开连接的回调 */ @Override public void connectionLost(Throwable throwable) { log.error(clientId + "与服务器断开连接!!" + cause.getMessage()); } /** * 消息到达的回调 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主题 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); } /** * 消息发布成功的回调 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); System.out.println(client.getClientId() + "发布消息成功!"); } }
这个时候我们启动服务,调用测试接口
就可以看到接口返回发布成功,并且能看到后台服务的打印日志了
至此大功告成了!
到此这篇关于Springboot实现MQTT通信的示例代码的文章就介绍到这了,更多相关Springboot MQTT通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!