springboot整合mqtt的步骤示例详解
作者:泓山
MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,适用于物联网设备之间的通信,本文介绍Spring Boot整合MQTT的实现,涵盖依赖引入、YML配置、配置类创建、自定义注解及使用示例,感兴趣的朋友跟随小编一起看看吧
使用场景:
mqtt可用于消息发送接收,一方面完成系统解耦,一方面可用于物联网设备的数据采集和指令控制
话不多说,下面直接干货
1、引入依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>2、yml配置
若需要搭建mqtt服务教程,留言我下期出哦!
spring:
application:
name: device-control
profiles:
active: local
device:
mqtt:
enable: true
username: admin
password: 123456
host-url: tcp://192.168.1.12:1883 # mqtt服务连接tcp地址
in-client-id: ${random.value} # 随机值,使出入站 client ID 不同
out-client-id: ${random.value}
client-id: ${random.int} # 客户端Id,不能相同,采用随机数 ${random.value}
default-topic: pubDevice # 默认主题
timeout: 60 # 超时时间
keepalive: 60 # 保持连接
clearSession: true # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)3、创建配置
创建MqttAutoConfiguration
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;
@AutoConfiguration
@ConditionalOnProperty(value = "device.mqtt.enable", havingValue = "true")
@IntegrationComponentScan
public class MqttAutoConfiguration {
@Resource
MqttProperties mqttProperties;
@Resource
MqttMessageHandle mqttMessageHandle;
/**
* Mqtt 客户端工厂 所有客户端从这里产生
* @return
*/
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttProperties.getHostUrl().split(","));
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
/**
* Mqtt 管道适配器
* @param factory
* @return
*/
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
}
/**
* 消息生产者 (接收,处理来自mqtt的消息)
* @param adapter
* @return
*/
@Bean
public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
adapter.setCompletionTimeout(5000);
adapter.setQos(1);
return IntegrationFlows.from( adapter)
.channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
.handle(mqttMessageHandle)
.get();
}
@Bean
public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 最大可创建的线程数
int maxPoolSize = 200;
executor.setMaxPoolSize(maxPoolSize);
// 核心线程池大小
int corePoolSize = 50;
executor.setCorePoolSize(corePoolSize);
// 队列最大长度
int queueCapacity = 1000;
executor.setQueueCapacity(queueCapacity);
// 线程池维护线程所允许的空闲时间
int keepAliveSeconds = 300;
executor.setKeepAliveSeconds(keepAliveSeconds);
// 线程池对拒绝任务(无线程可用)的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 出站处理器 (向 mqtt 发送消息)
* @param factory
* @return
*/
@Bean
public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
handler.setAsync(true);
handler.setConverter(new DefaultPahoMessageConverter());
handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
}
}创建MqttGateway
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Lazy;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@Lazy
@ConditionalOnProperty(value = "device.mqtt.enable", havingValue = "true")
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* @param topic String
* @param data String
* @return void
* @throws
* @description <description you method purpose>
* @author lwt
* @time 2024/1/24 09:29
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
/**
* @param topic String
* @param Qos Integer
* @param data String
* @return void
* @throws
* @description <description you method purpose>
* @author lwt
* @time 2024/1/24 09:31
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}创建MqttMessageHandle
import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
@Slf4j
@AutoConfiguration
public class MqttMessageHandle implements MessageHandler {
public static Map<String, Object> mqttServices;
@Override
public void handleMessage(Message<?> message) throws MessagingException {
getMqttTopicService(message);
}
public Map<String, Object> getMqttServices() {
if (mqttServices == null) {
mqttServices = SpringUtil.getConfigurableBeanFactory().getBeansWithAnnotation(MqttService.class);
}
return mqttServices;
}
public void getMqttTopicService(Message<?> message) {
// 在这里 我们根据不同的 主题 分发不同的消息
String receivedTopic = message.getHeaders().get("mqtt_receivedTopic", String.class);
if (receivedTopic == null || "".equals(receivedTopic)) {
return;
}
//updateTopicStatus(receivedTopic);
for (Map.Entry<String, Object> entry : getMqttServices().entrySet()) {
// 把所有带有 @MqttService 的类遍历
Class<?> clazz = entry.getValue().getClass();
// 获取他所有方法
Method[] methods = clazz.getSuperclass().getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(MqttTopic.class)) {
// 如果这个方法有 这个注解
MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
if (isMatch(receivedTopic, handleTopic.value())) {
// 并且 这个 topic 匹配成功
try {
method.invoke(SpringUtil.getBean(clazz),receivedTopic, message);
return;
} catch (IllegalAccessException e) {
e.printStackTrace();
log.error("代理炸了");
} catch (InvocationTargetException e) {
log.error("执行 {} 方法出现错误", handleTopic.value(), e);
}
}
}
}
}
}
/**
* mqtt 订阅的主题与我实际的主题是否匹配
* @param topic 是实际的主题
* @param pattern 是我订阅的主题 可以是通配符模式
* @return 是否匹配
*/
public static boolean isMatch(String topic, String pattern) {
if ((topic == null) || (pattern == null)) {
return false;
}
if (topic.equals(pattern)) {
// 完全相等是肯定匹配的
return true;
}
if ("#".equals(pattern)) {
// # 号代表所有主题 肯定匹配的
return true;
}
String[] splitTopic = topic.split("_");
String[] splitPattern = pattern.split("_");
boolean match = true;
// 如果包含 # 则只需要判断 # 前面的
for (int i = 0; i < splitPattern.length; i++) {
if (!"#".equals(splitPattern[i])) {
// 不是# 号 正常判断
if (i >= splitTopic.length) {
// 此时长度不相等 不匹配
match = false;
break;
}
if (!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])) {
// 不相等 且不等于 +
match = false;
break;
}
} else {
// 是# 号 肯定匹配的
break;
}
}
return match;
}
}创建MqttProperties
import lombok.Data;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "device.mqtt")
@Data
@AutoConfiguration
public class MqttProperties {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 进-客户Id
*/
private String inClientId;
/**
* 出-客户Id
*/
private String outClientId;
/**
* 客户Id
*/
private String clientId;
/**
* 默认连接话题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 保持连接数
*/
private int keepalive;
/**是否清除session*/
private boolean clearSession;
}创建MqttConstants
public class MqttConstants {
public static final String MQTT_DEVICE_INFO = "mqtt:device:info";
public static final String TOPIC_PUB_DEVICE = "pubDevice";
public static final String TOPIC_SUB_DEVICE = "subDevice";
}创建初始化
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Slf4j
@Component
@ConditionalOnProperty(value = "device.mqtt.enable", havingValue = "true")
public class InitMqttSubscriberTopic {
@Resource
MqttSubscriberService mqttSubscriberService;
@PostConstruct
public void initSubscriber() {
try {
mqttSubscriberService.addTopic(MqttConstants.TOPIC_PUB_DEVICE);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}4、自定义注解
创建MqttService
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {
@AliasFor(annotation = Component.class)
String value() default "";
}创建MqttTopic
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {
/**
* 主题名字
*/
String value() default "";
}创建如图:

6、使用示例
import cn.hutool.extra.spring.SpringUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@MqttService
public class MqttTopicHandle {
/**
* 监听到指定主题的消息
* @param topic
* @param message
*/
@SneakyThrows
@MqttTopic("pubDevice")
@Transactional(rollbackFor = Exception.class)
public void receive(String topic, Message<?> message) {
log.info("message:{}", message.getPayload());
String value = message.getPayload().toString();
// 进行逻辑处理
}
/**
* 发送消息到指定主题
* @param topic
* @param message
*/
@Transactional(rollbackFor = Exception.class)
public Boolean send(String topic, String message) {
try {
MqttGateway mqttGateway = SpringUtil.getBean(MqttGateway.class);
mqttGateway.sendToMqtt(topic,message);
} catch (Exception e){
return false;
}
return true;
}
}到此这篇关于springboot整合mqtt的文章就介绍到这了,更多相关springboot整合mqtt内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
