SpringBoot整合MQTT协议实现消息订阅与发布功能
作者:灿灿不熬夜
文章介绍了基于MQTT的项目实现,包含依赖配置、启动时固定主题订阅、连接配置类、消息发布与订阅功能、回调处理连接状态及消息,并通过接口测试消息发送与接收,验证EMQX连接状态与主题交互,感兴趣的朋友跟随小编一起看看吧
1、相关依赖 pom.xml文件
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>2、配置文件 application.yml
这里的订阅主题可不要,我这里用于启动的时候就订阅固定主题。适用主题固定的场景。
# MQTT服务地址,端口默认1883 mqtt-broker-url: tcp://127.0.0.1:1883 # 用户名 mqtt-username: admin # 密码 mqtt-password: public # 订阅主题(可以多个) mqtt-default-topic: mqtt/topic_test # 客户端Id mqtt-clientId: can
3、MQTT配置类
用于配置项目启动时就连接MQTT。
@Component
public class MqttConfig {
@Resource
private MqttPushClient mqttPushClient;
@Resource
private MqttSubClient mqttSubClient;
/**
* 用户名
*/
@Value("${mqtt-username}")
private String username;
/**
* 密码
*/
@Value("${mqtt-password}")
private String password;
/**
* 连接地址
*/
@Value("${mqtt-broker-url}")
private String hostUrl;
/**
* 客户Id
*/
@Value("${mqtt-clientId}")
private String clientId;
/**
* 默认连接话题,多个的话用逗号隔开
*/
@Value("${mqtt-default-topic}")
private String defaultTopic;
/**
* 超时时间
*/
private int timeout = 100;
/**
* 保持连接数
*/
private int keepalive = 60;
/**
* 连接至mqtt服务器,获取mqtt连接
*
* @return MqttPushClient
*/
@Bean
public MqttPushClient getMqttPushClient() {
// 连接至mqtt服务器,获取mqtt连接
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
// 订阅默认主题
mqttSubClient.subScribeDataPublishTopic(defaultTopic);
return mqttPushClient;
}
}4、发布连接类
连接MQTT的方法、发布消息的方法。
@Slf4j
@Component
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
@Getter
private static MqttClient client;
public static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
/**
* 连接
* @param host mqtt://127.0.0.1:1883
* @param clientId can
* @param username admin
* @param password password
* @param timeout 100
* @param keepalive 60
*/
public void connect(String host, String clientId, String username, String password, int timeout, int keepalive) {
MqttClient client;
try {
client = new MqttClient(host, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
// automaticReconnect 为 true 表示断线自动重连,但仅仅只是重新连接,并不订阅主题;在 connectComplete 回调函数重新订阅
options.setAutomaticReconnect(true);
MqttPushClient.setClient(client);
try {
//设置回调类
client.setCallback(pushCallback);
IMqttToken iMqttToken = client.connectWithResult(options);
boolean complete = iMqttToken.isComplete();
log.error("MQTT连接{}", complete ? "成功" : "失败");
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
}
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
/**
* 关闭MQTT连接
*/
public void close() throws MqttException {
client.disconnect();
client.close();
}
/**
* 发布,默认qos为0,非持久化
*
* @param topic 主题名
* @param pushMessage 消息
*/
public void publish(String topic, String pushMessage) {
publish(0, false, topic, pushMessage);
}
/**
* 发布
* QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
* QoS 1:消息至少传送一次。
* QoS 2:消息只传送一次。
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
// MQTT主题不存在
if (null == mTopic) return;
try {
mTopic.publish(message);
} catch (Exception e) {
log.error("MQTT发送消息异常:", e);
e.printStackTrace();
}
}
}5、订阅类
用于订阅某个或多个主题、取消订阅某个或者多个主题。
@Slf4j
@Component
public class MqttSubClient {
private static final Logger logger = LoggerFactory.getLogger(MqttSubClient.class);
// 订阅多个主题以逗号分开
public void subScribeDataPublishTopic(String defaultTopic) {
//订阅test_queue主题
String[] mqttTopic = defaultTopic.split(",");
for (String s : mqttTopic) {
//订阅主题
subscribe(s, 0);
}
}
/**
* 订阅某个主题,qos默认为0
*
* @param topic 主题
*/
public void subscribe(String topic) {
subscribe(topic, 0);
}
/**
* 订阅某个主题
*
* @param topic 主题名
* @param qos qos
*/
public void subscribe(String topic, int qos) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
}
client.subscribe(topic, qos);
log.error("MQTT订阅主题:{}", topic);
} catch (MqttException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
/**
* 取消订阅某个主题
* @param topic 要取消订阅的主题名
*/
public void unsubscribe(String topic) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null || !client.isConnected()) {
return;
}
client.unsubscribe(topic); // 取消订阅
log.error("MQTT取消订阅主题: {}", topic);
} catch (MqttException e) {
log.error("取消订阅失败: {}", e.getMessage());
e.printStackTrace();
}
}
/**
* 批量取消订阅多个主题
* @param topics 主题数组
*/
public void unsubscribe(String[] topics) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null || !client.isConnected()) {
return;
}
client.unsubscribe(topics); // 取消订阅多个主题
log.error("MQTT取消订阅主题: {}", Arrays.toString(topics));
} catch (MqttException e) {
log.error("取消订阅失败: {}", e.getMessage());
e.printStackTrace();
}
}
}
6、回调类
处理MQTT连接断开重连、订阅主题接收的消息处理。
@Slf4j
@Component
public class PushCallback implements MqttCallback {
@Resource
@Lazy
private MqttPushClient mqttPushClient;
/**
* 连接丢失后,一般在这里面进行重连(重连的逻辑需要自己处理)
* @param cause .
*/
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT连接断开,正在重连:" + cause);
}
/**
* 发送消息,消息到达后处理方法
* @param token .
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.error("deliveryComplete---------{}", token.isComplete());
}
/**
* 订阅主题接收到消息处理方法
* @param topic 主题
* @param message 消息
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
// 订阅主题后得到的消息会执行到这里面,这里在控制台有输出
log.error("MQTT接收消息主题 : {}", topic);
log.error("MQTT接收消息Qos : {}", message.getQos());
log.error("MQTT接收消息内容 : {}", message);
}
}7、启动后,进入EMQX管理页面
程序允许打印连接成功,去EMQX管理页面查看。

EMQX管理页面这里有所有的主题列表。

包括客户端订阅的主题。

8、通过接口给主题发送消息
@RestController
@Slf4j
@RequestMapping("/api")
public class ApiController {
@Resource
private MqttPushClient mqttPushClient;
@GetMapping("/test")
public String getVersions(@RequestParam String topic, @RequestParam String message) {
mqttPushClient.publish(topic, message);
return "ok";
}
}浏览器直接调用,topic:配置文件里面订阅的主题。message:你想发送给主题的消息。

控制台日志打印:发送成功,并且接收到了主题发送的消息。

到此这篇关于SpringBoot整合MQTT协议实现消息订阅与发布功能的文章就介绍到这了,更多相关SpringBoot整合MQTT订阅与发布内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
