SpringBoot集成Eclipse Mosquitto的实现示例
作者:墨鸦_Cormorant
本文主要介绍了SpringBoot集成Eclipse Mosquitto的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
添加 MQTT 客户端依赖
在 Spring Boot 项目的 pom.xml 中添加 Eclipse Paho MQTT 客户端依赖(主流的 MQTT Java 客户端):
<!-- MQTT 客户端 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
配置 MQTT 连接参数
在 application.yml(或 application.properties)中配置 Mosquitto 连接信息:
mqtt: # 是否启用 enable: true # Mosquitto 服务地址(非加密端口),若启用 TLS 加密,使用 ssl://localhost:8883 broker: tcp://localhost:1883 # 客户端唯一标识(建议加随机数避免冲突) client-id: springboot-mqtt-client # 认证用户名(Mosquitto 启用认证时必填) username: user1 # 认证密码 password: 123456 # 默认 QoS 等级(0/1/2) defalut-qos: 1 # 心跳间隔(秒) keep-alive: 60
实现 MQTT 客户端(发布 + 订阅)
MQTT 客户端配置类
配置实体类
import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
@Data
@Component
@RefreshScope
@ConfigurationProperties(prefix = "mqtt")
@ConditionalOnProperty(name = "mqtt.enable", havingValue = "true")
public class MqttProperties {
/**
* 是否启用
*/
private boolean enable;
/**
* Mosquitto 服务地址(非加密端口)。若启用 TLS 加密,使用 ssl://localhost:8883
*/
private String broker;
/**
* 客户端唯一标识(建议加随机数避免冲突)
*/
private String clientId;
/**
* 认证用户名(Mosquitto 启用认证时必填)
*/
private String username;
/**
* 认证密码
*/
private String password;
/**
* 默认 QoS 等级(0/1/2),非关键数据用 QoS 0,重要状态用 QoS 1,核心控制指令用 QoS 2
*/
private int defaultQos;
/**
* 心跳间隔(秒)
*/
private int keepAlive = 60;
}
配置类
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
@Slf4j
@Configuration
@ConditionalOnBean(MqttProperties.class)
public class MqttConfig {
private final MqttProperties mqttProp;
public MqttConfig(MqttProperties mqttProp) {
this.mqttProp = mqttProp;
}
/**
* 创建 MQTT 客户端实例
*/
@Bean
public MqttClient mqttClient() throws MqttException {
// 客户端 ID 建议添加随机数,避免重复连接
String clientIdWithRandom = mqttProp.getClientId() + "_" + System.currentTimeMillis();
MqttClient client = new MqttClient(mqttProp.getBroker(), clientIdWithRandom, new MemoryPersistence());
// 配置连接参数
MqttConnectOptions options = new MqttConnectOptions();
if (StringUtils.hasText((mqttProp.getUsername())))
options.setUserName(mqttProp.getUsername());
if (StringUtils.hasText((mqttProp.getPassword())))
options.setPassword(mqttProp.getPassword().toCharArray());
options.setKeepAliveInterval(mqttProp.getKeepAlive());
// 自动重连
options.setAutomaticReconnect(true);
// 不清除会话(保留订阅关系和未确认消息)
options.setCleanSession(false);
// 连接回调(处理连接状态)
client.setCallback(new MqttCallback() {
/**
* 连接断开时触发,可在此实现重连逻辑
*/
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT 连接断开,原因:{}", cause.getMessage());
}
/**
* 收到订阅的消息时触发,用于处理业务逻辑(如存储数据到数据库)
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 接收消息回调(订阅的主题有消息时触发)
String content = new String(message.getPayload());
log.debug("收到消息 - 主题:{},内容:{}", topic, content);
// TODO 业务逻辑
}
/**
* 消息发布完成后触发,可用于确认消息已送达
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 消息发布完成回调
try {
log.debug("消息发布成功,主题:{}", token.getTopics()[0]);
} catch (Exception e) {
log.error("", e);
}
}
});
// 连接到 Mosquitto
client.connect(options);
log.info("MQTT 连接成功:{}", mqttProp.getClientId());
return client;
}
}
发布和订阅工具类
消息发布工具类
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
/**
* MQTT 消息发布工具类
*/
@Slf4j
@Component
@ConditionalOnBean(MqttProperties.class)
public class MqttPublisher {
private final MqttClient mqttClient;
private final MqttProperties mqttProp;
public MqttPublisher(MqttClient mqttClient, MqttProperties mqttProp) {
this.mqttClient = mqttClient;
this.mqttProp = mqttProp;
}
/**
* 发布消息到指定主题
* @param topic 主题
* @param content 消息内容
*/
public void publish(String topic, String content) throws MqttException {
publish(topic, content, mqttProp.getDefaultQos());
}
/**
* 发布消息到指定主题(自定义QoS)
* @param topic 主题
* @param content 消息内容
* @param qos QoS等级
*/
public void publish(String topic, String content, int qos) throws MqttException {
if (!mqttClient.isConnected()) {
mqttClient.reconnect(); // 若断开连接,尝试重连
}
log.debug("发布消息,主题:{},内容:{}, QoS等级:{}", topic, content, qos);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
mqttClient.publish(topic, message);
}
}
消息订阅工具类
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
/**
* MQTT 消息订阅工具类
*/
@Slf4j
@Component
@ConditionalOnBean(MqttProperties.class)
public class MqttSubscriber {
private final MqttClient mqttClient;
private final MqttProperties mqttProp;
public MqttSubscriber(MqttClient mqttClient, MqttProperties mqttProp) {
this.mqttClient = mqttClient;
this.mqttProp = mqttProp;
}
/**
* 订阅指定主题
* @param topic 主题(支持通配符,如 sensor/+)
*/
public void subscribe(String topic) throws MqttException {
subscribe(topic, mqttProp.getDefaultQos());
}
/**
* 订阅指定主题(自定义QoS)
* @param topic 主题
* @param qos QoS等级
*/
public void subscribe(String topic, int qos) throws MqttException {
if (!mqttClient.isConnected()) {
mqttClient.reconnect();
}
mqttClient.subscribe(topic, qos);
log.info("已订阅主题:{},QoS等级:{}", topic, qos);
}
/**
* 取消订阅主题
* @param topic 主题
*/
public void unsubscribe(String topic) throws MqttException {
mqttClient.unsubscribe(topic);
log.info("已取消订阅主题:{}", topic);
}
}
测试 MQTT 功能
创建一个测试控制器,验证消息发布和订阅:
import com.blackcrow.test.mqtt.config.MqttPublisher;
import com.blackcrow.test.mqtt.config.MqttSubscriber;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MqttTestController {
@Autowired
private MqttPublisher mqttPublisher;
@Autowired
private MqttSubscriber mqttSubscriber;
@Value("${mqtt.default-topic:topic/temp}")
private String defaultTopic;
/**
* 订阅主题
*/
@GetMapping("/subscribe")
public String subscribe(@RequestParam(required = false) String topic) {
try {
String targetTopic = topic != null ? topic : defaultTopic;
mqttSubscriber.subscribe(targetTopic);
return "订阅成功:" + targetTopic;
} catch (MqttException e) {
return "订阅失败:" + e.getMessage();
}
}
/**
* 发布消息
*/
@GetMapping("/publish")
public String publish(@RequestParam(required = false) String topic, @RequestParam String message) {
try {
String targetTopic = topic != null ? topic : defaultTopic;
mqttPublisher.publish(targetTopic, message);
return "发布成功:主题=" + targetTopic + ",消息=" + message;
} catch (MqttException e) {
return "发布失败:" + e.getMessage();
}
}
}
到此这篇关于SpringBoot集成Eclipse Mosquitto的实现示例的文章就介绍到这了,更多相关SpringBoot集成Eclipse Mosquitto内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
