java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java对接MQTT协议

Java对接MQTT协议的完整实现示例代码

作者:最业余的程序猿

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛,这篇文章主要介绍了Java对接MQTT协议的完整实现,需要的朋友可以参考下

前言

本文将详细介绍如何使用Java和Spring Integration框架实现MQTT协议的对接。代码包括MQTT客户端的配置、消息的订阅与发布、以及消息的处理逻辑。

前置依赖

<!-- MQTT 依赖 -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

1. MQTT配置类

代码解析

MqttConfig类是MQTT的核心配置类,负责MQTT客户端的初始化、连接选项的设置以及消息通道的创建。

package com.ruoyi.framework.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.io.IOException;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl}); // Broker 地址
        options.setAutomaticReconnect(true); // 自动重连
        factory.setConnectionOptions(options);
        System.out.println("Connecting to broker: " + brokerUrl + " OK.");
        return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                clientId + "-inbound",  // 客户端ID(唯一)
                mqttClientFactory(),   // 使用工厂创建客户端
                "testSub/#" // 订阅的主题
        );
        adapter.setOutputChannelName("mqttInputChannel"); // 关键:绑定到输入通道 消息输出通道
        adapter.setQos(1); // 设置 QoS 级别
        return adapter;
    }

    // 出站适配器(发送)
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
                clientId + "-outbound",
                mqttClientFactory()
        );
        handler.setAsync(true);
        handler.setDefaultQos(1);
        return handler;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel(); // 使用直连通道
    }

    // 出站通道(发送消息)
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

}

1.1 MQTT客户端工厂

mqttClientFactory方法创建了一个MqttPahoClientFactory实例,用于配置MQTT客户端的连接选项。

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[]{brokerUrl}); // 设置MQTT服务端地址
    options.setAutomaticReconnect(true); // 开启自动重连
    factory.setConnectionOptions(options);
    System.out.println("Connecting to broker: " + brokerUrl + " OK.");
    return factory;
}

1.2 MQTT消息订阅适配器

mqttInbound方法创建了一个MqttPahoMessageDrivenChannelAdapter实例,用于订阅MQTT主题并接收消息。

@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
            clientId + "-inbound", mqttClientFactory(), "testSub/#");
    adapter.setOutputChannelName("mqttInputChannel"); // 绑定到输入通道
    adapter.setQos(1); // 设置QoS级别
    return adapter;
}

1.3 MQTT消息发布适配器

mqttOutbound方法创建了一个MqttPahoMessageHandler实例,用于发布消息到MQTT主题。

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
            clientId + "-outbound", mqttClientFactory());
    handler.setAsync(true); // 异步发送
    handler.setDefaultQos(1); // 设置QoS级别
    return handler;
}

1.4 消息通道

mqttInputChannelmqttOutboundChannel方法分别创建了输入和输出通道,用于消息的传递。

@Bean
public MessageChannel mqttInputChannel() {
    return new DirectChannel(); // 直连通道
}

@Bean
public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
}

2. MQTT消息监听器

代码解析

MqttMessageListener类负责处理从MQTT主题接收到的消息

package com.ruoyi.framework.mqtt;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageListener {

    @Autowired
    private IMqttService mqttService;

    // 处理入站消息
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String payload = message.getPayload().toString();
                Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
                log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);

                try {
                    if (topic.startsWith("heartbeat/")) {  //心跳上报
                        mqttService.handleHeartbeat(payload);
                    } else if (topic.startsWith("report/")) {  //数据上报
                        mqttService.handleReport(payload);
                    }
                } catch (Exception e) {
                    log.error("[MQTT] 消息处理失败: {}", e.getMessage());
                }
            }
        };
    }
}

 或Ⅱ

package com.ruoyi.framework.mqtt;

import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.power.domain.protocol.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageListener {

    @Autowired
    private IMqttService mqttService;

    // 处理入站消息
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String payload = message.getPayload().toString();
                Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
                log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);

                try {
                    if (topic.startsWith("testSub/")) {
                        BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
                        switch (baseMsg.getType()) {
                            case MessageConstants.HEART_BEAT:
                                HeartbeatMessage heartbeat = JSON.parseObject(payload, HeartbeatMessage.class);
                                mqttService.handleHeartbeat(heartbeat);
                                break;
                            case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
                                ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
                                mqttService.handleReport(report);
                                break;
                            case MessageConstants.ALARM:
                                AlarmMessage alarm = JSON.parseObject(payload, AlarmMessage.class);
                                mqttService.handleAlarm(alarm);
                                break;
                            case MessageConstants.CALL_ACK:
                                mqttService.handleCallReadAck(baseMsg);
                                break;
                            case MessageConstants.CONTROL_ACK:
                                ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
                                mqttService.handleControlAck(controlAck);
                                break;
                            default:
                                System.err.println("Unknown message type: " + baseMsg.getType());
                        }
                    } else if (topic.startsWith("report/allpoints")) {
                        BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
                        switch (baseMsg.getType()) {
                            // 如果没收到callAck 则代表采集器没收到callRead
                            case MessageConstants.CALL_ACK:
                                mqttService.handleCallReadAck(baseMsg);
                                break;
                            case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
                                ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
                                mqttService.handleReport(report);
                                break;
                            case MessageConstants.CONTROL_ACK:
                                ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
                                mqttService.handleControlAck(controlAck);
                                break;
                            case MessageConstants.MULTIVALUESET_ACK:
                                MultiValueSetMessage multvaluesetAck = JSON.parseObject(payload, MultiValueSetMessage.class);
                                mqttService.handleMultiValueSet(multvaluesetAck);
                                break;
                        }
                    }
                } catch (Exception e) {
                    log.error("[MQTT] 消息处理失败: {}", e.getMessage());
                }
            }
        };
    }
}

2.1 消息处理逻辑

handler方法是一个MessageHandler,用于处理从mqttInputChannel接收到的消息。

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 获取主题
            String payload = message.getPayload().toString(); // 获取消息内容
            Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
            log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);

            try {
                if (topic.startsWith("testSub/")) {
                    // 处理订阅主题为testSub的消息
                    BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
                    switch (baseMsg.getType()) {
                        case MessageConstants.HEART_BEAT:
                            mqttService.handleHeartbeat(JSON.parseObject(payload, HeartbeatMessage.class));
                            break;
                        case MessageConstants.REPORT:
                            mqttService.handleReport(JSON.parseObject(payload, ReportMessage.class));
                            break;
                        // 其他消息类型的处理逻辑
                    }
                } else if (topic.startsWith("report/allpoints")) {
                    // 处理订阅主题为report/allpoints的消息
                }
            } catch (Exception e) {
                log.error("[MQTT] 消息处理失败: {}", e.getMessage());
            }
        }
    };
}

3. MQTT消息网关

代码解析

MqttMessageGateway接口提供了一个简单的发送消息的方法。

package com.ruoyi.framework.mqtt;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMessageGateway {

    void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);

}

使用示例:

@Autowired
private MqttMessageGateway mqttMessageGateway;

public void publishMessage() {
    mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}

4. MQTT服务接口

代码解析

IMqttService接口定义了处理不同类型消息的方法。

package com.ruoyi.framework.mqtt;

import com.ruoyi.power.domain.protocol.*;

public interface IMqttService {

    /**
     * 处理心跳数据
     * @param heartbeat MQTT 消息内容
     */
    void handleHeartbeat(HeartbeatMessage heartbeat);

    /**
     * 处理上报数据
     * @param report MQTT 消息内容
     */
    void handleReport(ReportMessage report);

    /**
     * 服务器发送遥控命令到采集器
     * 服务器发送遥调命令到采集器
     * @param controlMessage 遥控命令
     */
    void sendControl(ControlMessage controlMessage);

    /**
     * 处理上报仪表报警
     * @param alarm 报警内容
     * @return String 配置内容
     */
    void handleAlarm(AlarmMessage alarm);

    /**
     * 下发控制命令到指定网关
     * @param saleid 配电站ID
     * @param gateid 网关ID
     */
    void sendCallRead(String saleid, String gateid, String startTime, String endTime);

    /**
     * 采集器响应召读命令(响应召读命令回复包,不代表召读时间段的数据一定存在,采集器收到召读命令后首先回复
     * 此数据包,下一不再查找相应历史数据, 存在即发送,不存在不发送 )
     * @param baseMsg 采集器响应召读命令(
     */
    void handleCallReadAck(BaseMessage baseMsg);

    /**
     * 采集器发送执行结果到服务器
     * @param controlAck
     */
    void handleControlAck(ControlMessage controlAck);

    /**
     * 由服务器发布获取数据命令到采集器
     * @param baseMessage
     */
    void getCurrentData(BaseMessage baseMessage);

    /**
     *
     * @param multiValueSetMessage
     */
    void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage);

    /**
     * 处理相应采集器接收到服务器的命令
     * @param multiValueSetMessage
     */
    void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage);

}

5. 使用说明

5.1 配置MQTT参数

application.yml中配置MQTT的相关参数:

mqtt:
  broker-url: tcp://127.0.0.1:1883
  client-id: mqtt-client-123

5.2 实现IMqttService接口

创建一个类实现IMqttService接口,并提供具体的业务逻辑。例如:

package com.ruoyi.framework.mqtt;

import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.common.constant.OperationConstants;
import com.ruoyi.common.constant.ResultConstants;
import com.ruoyi.common.utils.bean.BeanUtils;
import com.ruoyi.power.config.CustomIdGenerator;
import com.ruoyi.power.domain.*;
import com.ruoyi.power.domain.protocol.*;
import com.ruoyi.power.mapper.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.*;

@Slf4j
@Service
public class MqttServiceImpl implements IMqttService {

    @Autowired
    private MqttMessageGateway mqttGateway;

    @Autowired
    private HeartBeatMapper heartbeatMapper;

    @Autowired
    private GatewayInfoMapper gatewayInfoMapper;

    @Autowired
    private ReportMapper reportMapper;

    @Autowired
    private ReportMeterMapper reportMeterMapper;

    @Autowired
    private AlarmMapper alarmMapper;

    @Autowired
    private AlarmDetailMapper alarmDetailMapper;

    @Autowired
    private ControlMapper controlMapper;

    // 处理心跳数据
    @Override
    public void handleHeartbeat(HeartbeatMessage heartbeat) {
        try {
            // 心跳存储到数据库
            HeartBeat heartBeat = new HeartBeat();
            heartBeat.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
            heartBeat.setGateId(heartbeat.getGateid());
            heartBeat.setType(heartbeat.getType());
            heartBeat.setSaleId(heartbeat.getSaleid());
            heartBeat.setTime(heartbeat.getTime());
            heartBeat.setOperation(heartbeat.getOperation());
            heartbeatMapper.insertHeartBeat(heartBeat);
            log.info("[心跳数据] 存储成功: substationId={}, gatewayId={}",
                    heartbeat.getSaleid(), heartbeat.getGateid());

            // 查询或创建网关记录
            GatewayInfo gatewayInfo = gatewayInfoMapper.selectOne(Wrappers.<GatewayInfo>lambdaQuery().eq(GatewayInfo::getGateid, heartbeat.getGateid()));
            if(ObjectUtils.isNull(gatewayInfo)) {
                createNewGateway(heartbeat.getSaleid(), heartbeat.getGateid());
            } else {
                gatewayInfo.setLastHeartbeatTime(LocalDateTime.now());
                gatewayInfo.setUpdateTime(LocalDateTime.now());
                int updated = gatewayInfoMapper.updateGatewayInfo(gatewayInfo);
                if(updated == 0) {
                    log.warn("心跳更新冲突 saleid:{}, gateid:{}", heartbeat.getSaleid(), heartbeat.getGateid());
                }
            }

            // 如果网关请求心跳,响应心跳
            sendHeartbeat(heartbeat.getSaleid(), heartbeat.getGateid(), heartbeat);
        } catch (Exception e) {
            log.error("[心跳数据] 处理失败: {}", e.getMessage());
        }
    }

    // 创建新网关记录
    private void createNewGateway(String saleid, String gateid) {
        GatewayInfo newGateway = new GatewayInfo();
        newGateway.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        newGateway.setSaleid(saleid);
        newGateway.setGateid(gateid);
        newGateway.setLastHeartbeatTime(LocalDateTime.now());
        newGateway.setStatus("0");
        newGateway.setCheckInterval(60L); // 默认间隔
        newGateway.setCreateTime(LocalDateTime.now());
        gatewayInfoMapper.insertGatewayInfo(newGateway);
    }

    // 下发心跳
    private void sendHeartbeat(String saleid, String gateid, HeartbeatMessage heartbeat) {
        String topic = String.format("report/allpoints", saleid, gateid);
        heartbeat.setOperation(OperationConstants.TIME);
        mqttGateway.sendMessage(topic, JSON.toJSONString(heartbeat));
        log.info("[配置下发] topic={}, config={}", topic, JSON.toJSONString(heartbeat));
    }

    // 处理上报数据
    @Override
    public void handleReport(ReportMessage report) {
        try {
            // 存储到仪表信息表 转换为仪表信息表(meterMapper)
            String reportId = createReportData(report);
            // 批量存储仪表数据
            List<ReportMeter> meterEntities = report.getMeter().stream()
                    .map(m -> {
                        ReportMeter entity = new ReportMeter();
                        entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
                        entity.setReportId(reportId);
                        entity.setMeterId(m.getId());
                        entity.setStatus(m.getStatus());
                        entity.setName(m.getName());
                        entity.setValuesJson(JSON.toJSONString(m.getValues()));
                        return entity;
                    }).toList();
            for (ReportMeter meter : meterEntities) {
                reportMeterMapper.insertReportMeter(meter);
            }
            log.info("[上报数据] 存储成功: substationId={}, gatewayId={}",
                    report.getSaleid(), report.getGateid());
        } catch (Exception e) {
            log.error("[上报数据] 处理失败: {}", e.getMessage());
        }
    }

    // 创建新数据记录
    private String createReportData(ReportMessage report) {
        Report rep = new Report();
        rep.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        rep.setSaleid(report.getSaleid());
        rep.setGateid(report.getGateid());
        rep.setTime(report.getTime());
        rep.setType(report.getType());
        rep.setSequence(report.getSequence());
        rep.setSource(report.getSource());
        rep.setCreateTime(LocalDateTime.now());
        reportMapper.insert(rep);
        return rep.getId();
    }

    // 下发控制命令
    @Override
    public void sendControl(ControlMessage controlMessage) {
        ControlMessage message = new ControlMessage();
        message.setSaleid(controlMessage.getSaleid());
        message.setGateid(controlMessage.getGateid());
        message.setType(controlMessage.getType());
        message.setCuuid(LocalDateTime.now().toString());
        message.setTime(LocalDateTime.now());
        message.setMeterid(controlMessage.getMeterid());
        message.setName(controlMessage.getName());
        message.setFunctionid(controlMessage.getFunctionid());
        message.setValue(controlMessage.getValue());
        // 存储到控制记录表
        createControl(controlMessage);
        String topic = String.format("report/allpoints", message);
        mqttGateway.sendMessage(topic, JSON.toJSONString(message));
        log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(message));
    }

    private void createControl(ControlMessage controlMessage) {
        Control control = new Control();
        control.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        control.setSaleid(controlMessage.getSaleid());
        control.setGateid(controlMessage.getGateid());
        control.setType(controlMessage.getType());
        control.setCuuid(controlMessage.getCuuid());
        control.setTime(controlMessage.getTime());
        control.setMeterid(controlMessage.getMeterid());
        control.setName(controlMessage.getName());
        control.setFunctionid(controlMessage.getFunctionid());
        control.setValue(controlMessage.getValue());
        control.setResult(controlMessage.getResult());
        control.setErrordesc(controlMessage.getErrordesc());
        controlMapper.insertControl(control);
    }

    @Override
    public void handleAlarm(AlarmMessage alarmMessage) {
        try {
            // 存储报警信息表 转换为报警信息表(alarmMapper)
            String alarmId = createAlarmData(alarmMessage);
            // 批量存储仪表数据
            List<AlarmDetail> alarmEntities = alarmMessage.getFunction().stream()
                    .map(m -> {
                        AlarmDetail entity = new AlarmDetail();
                        entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
                        entity.setAlarmId(alarmId);
                        entity.setPtId(m.getId());
                        entity.setAlarmType(m.getAlarmType());
                        entity.setLabel(m.getLabel());
                        entity.setCurrentValue(m.getCurrentValue());
                        entity.setSettingValue(m.getSettingValue());
                        entity.setLevel(m.getLevel());
                        return entity;
                    }).toList();
            for (AlarmDetail alarm : alarmEntities) {
                alarmDetailMapper.insertAlarmDetail(alarm);
            }
            log.info("[上报数据] 存储成功: substationId={}, gatewayId={}",
                    alarmMessage.getSaleid(), alarmMessage.getGateid());

        } catch (Exception e) {
            log.error("[上报数据] 处理失败: {}", e.getMessage());
        }

    }

    @Override
    public void sendCallRead(String saleid, String gateid, String startTime, String endTime) {
        HashMap<String, String> protocol = new HashMap<>();
        protocol.put("saleid", saleid);
        protocol.put("gateid", gateid);
        protocol.put("type", MessageConstants.CALL_READ);
        protocol.put("time", String.valueOf(LocalDateTime.now()));
        protocol.put("startTime", startTime);
        protocol.put("endTime", endTime);
        String topic = String.format("report/allpoints", saleid, gateid);
        mqttGateway.sendMessage(topic, JSON.toJSONString(protocol));
        log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(protocol));
    }

    @Override
    public void handleCallReadAck(BaseMessage baseMsg) {
        Report report = new Report();
        report.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        report.setSaleid(baseMsg.getSaleid());
        report.setGateid(baseMsg.getGateid());
        report.setTime(baseMsg.getTime());
        report.setType(baseMsg.getType());
        reportMapper.insert(report);
    }

    @Override
    public void handleControlAck(ControlMessage controlAck) {
        if(ResultConstants.FAILURE.equals(controlAck.getResult())) {
            createControl(controlAck);
            // 配置或设备问题,记录错误并报警
            log.error("控制失败(不可重试): {}", controlAck.getErrordesc());
        } else if(ResultConstants.SUCCESS.equals(controlAck.getResult())) {
            createControl(controlAck);
            log.info("控制成功: {}", controlAck.getCuuid());
        }
    }

    @Override
    public void getCurrentData(BaseMessage baseMessage) {
        String topic = String.format("report/allpoints", baseMessage.getSaleid(), baseMessage.getGateid());
        mqttGateway.sendMessage(topic, JSON.toJSONString(baseMessage));
        log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(baseMessage));
    }

    @Override
    public void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage) {

    }

    @Override
    public void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage) {
        String topic = String.format("report/allpoints", multiValueSetMessage.getSaleid(), multiValueSetMessage.getGateid());
        ControlMessage controlMessage = new ControlMessage();
        try {
            mqttGateway.sendMessage(topic, JSON.toJSONString(multiValueSetMessage));
            log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
            if(ResultConstants.SUCCESS.equals(multiValueSetMessage.getResult())) {
                BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
                createControl(controlMessage);
                log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
            } else if(ResultConstants.FAILURE.equals(multiValueSetMessage.getResult())){
                BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
                createControl(controlMessage);
                log.error("[控制失败] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
            }
        } catch (Exception e) {
            log.error("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
        }

    }

    private String createAlarmData(AlarmMessage alarmMessage) {
        Alarm alarm = new Alarm();
        alarm.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        alarm.setSaleid(alarmMessage.getSaleid());
        alarm.setGateid(alarmMessage.getGateid());
        alarm.setTime(alarmMessage.getTime());
        alarm.setType(alarmMessage.getType());
        alarm.setSequence(alarmMessage.getSequence());
        alarm.setName(alarmMessage.getName());
        alarm.setMeterid(alarmMessage.getMeterid());
        alarm.setCreateTime(LocalDateTime.now());
        alarmMapper.insert(alarm);
        return alarm.getId();
    }

}

5.3 发送MQTT消息

通过MqttMessageGateway发送消息:

@Autowired
private MqttMessageGateway mqttMessageGateway;

public void sendTestMessage() {
    mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}

6. 总结

本文介绍了如何使用Spring Integration框架实现MQTT协议的对接,包括客户端的配置、消息的订阅与发布、以及消息的处理逻辑。通过上述代码,您可以快速实现Java与MQTT的集成,并根据业务需求扩展消息的处理逻辑。

到此这篇关于Java对接MQTT协议的文章就介绍到这了,更多相关Java对接MQTT协议内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文