java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > springboot整合mqtt消息推送

springboot整合mqtt实现消息订阅和推送功能

作者:洛阳泰山

mica-mqtt-client-spring-boot-starter是一个方便、高效、可靠的MQTT客户端启动器,适用于需要使用MQTT协议进行消息通信的Spring Boot应用程序,这篇文章主要介绍了springboot整合mqtt实现消息订阅和推送功能,需要的朋友可以参考下

前言

mica-mqtt-client-spring-boot-starter是一个基于Spring Boot的MQTT客户端启动器,它集成了mica-mqtt客户端,提供了在Spring Boot应用程序中使用MQTT协议进行消息通信的能力。以下是关于mica-mqtt-client-spring-boot-starter的简介:

特点:

总之,mica-mqtt-client-spring-boot-starter是一个方便、高效、可靠的MQTT客户端启动器,适用于需要使用MQTT协议进行消息通信的Spring Boot应用程序。

功能

教程

添加依赖

在springboot项目中添加maven依赖:

        <!-- https://mvnrepository.com/artifact/net.dreamlu/mica-mqtt-client-spring-boot-starter -->
        <dependency>
            <groupId>net.dreamlu</groupId>
            <artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
            <version>2.2.8</version>
        </dependency>

配置参数

在spring配置文件中配置mqtt相关参数,配置如下:

mqtt:
  server:    
    enabled: false              # 是否开启服务端,默认:false
  client:
    enabled: true               # 是否开启客户端,默认:false
    ip: 172.16.10.203   # 连接的服务端 ip ,默认:127.0.0.1
    port: 1883                  # 端口:默认:1883
    name: Mica2-Mqtt2-Client      # 名称,默认:Mica-Mqtt-Client
    clientId: coalface_safety_3d            # 客户端Id(非常重要,一般为设备 sn,不可重复)
    user-name: admin           # 认证的用户名 你的用户名
    password: 3@!cHy@j       # 认证的密码
    timeout: 5                  # 连接超时时间,单位:秒,默认:5秒
    reconnect: true             # 是否重连,默认:true
    re-interval: 5000           # 重连时间,默认 5000 毫秒
    version: MQTT_3_1           # mqtt 协议版本,默认:3.1.1
    read-buffer-size: 8092      # 接收数据的 buffer size,默认:8092
    max-bytes-in-message: 8092  # 消息解析最大 bytes 长度,默认:8092
    buffer-allocator: heap      # 堆内存和堆外内存,默认:堆内存
    keep-alive-secs: 60         # keep-alive 心跳维持时间,单位:秒
    clean-session: false         # mqtt clean session,默认:true
    will-message:                # 消息遗嘱
      qos: at_least_once
    ssl:
    enabled: false            # 是否开启 ssl 认证,2.1.0 开始支持双向认证
    keystore-path:            # 可选参数:ssl 双向认证 keystore 目录,支持 classpath:/ 路径。
    keystore-pass:            # 可选参数:ssl 双向认证 keystore 密码
    truststore-path:          # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
    truststore-pass:          # 可选参数:ssl 双向认证 truststore 密码

注意:ssl 存在三种情况

服务端开启ssl客户端
ClientAuth 为 NONE(不需要客户端验证)仅仅需要开启 ssl 即可不用配置证书
ClientAuth 为 OPTIONAL(与客户端协商)需开启 ssl 并且配置 truststore 证书
ClientAuth 为 REQUIRE (必须的客户端验证)需开启 ssl 并且配置 truststore、 keystore证书

创建订阅

创建一个mqtt订阅消息监听类,例如SimulationSubscriber,代码如下:

import com.alibaba.fastjson.JSONObject;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.tio.utils.buffer.ByteBufferUtil;
/**
 * @author tarzan
 */
@Component
@Slf4j
public class SimulationSubscriber {
    @MqttClientSubscribe("tuoyuan/publish/zj/#")
    public void zjOne(String topic, byte[] payload){
        String[] strs=topic.split("/");
        String ID=strs[strs.length-1];
        log.info("topic:{} payload:{} ID:{}", topic, new String(payload, StandardCharsets.UTF_8),ID);
    }
	 @MqttClientSubscribe("/sys/${deviceName}/thing/sub/register")
	  public void thingSubRegister(String topic, byte[] payload) {
	    // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
	    // 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
	    logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
	  }
    @MqttClientSubscribe("/tianma/publish/cmj")
    public void cmj(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }
    @MqttClientSubscribe("/tianma/publish/zj")
    public void zj(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }
    @MqttClientSubscribe("/tianma/publish/gbj")
    public void gbj(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }
    @MqttClientSubscribe("/tianma/publish/ltl")
    public void ltl(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }
    @MqttClientSubscribe("/tianma/publish/ntl")
    public void ntl(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }
    @MqttClientSubscribe("/tianma/publish/ccl")
    public void ccl(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }
}

创建发布

创建一个mqtt消息发布接口类,例如 MqttTestController,代码如下:

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.springblade.core.secure.annotation.NoToken;
import org.springblade.core.tool.api.R;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
/**
 * @author tarzan
 */
@RestController
@Api(tags = "mqtt测试")
@NoToken
@RequestMapping("/mqtt")
@AllArgsConstructor
@Slf4j
public class MqttTestController {
    private final MqttClientTemplate mqttClientTemplate;
    @ApiOperation(value = "消息发送")
    @PostMapping("/publish")
    private R<Boolean> publish(String topic, String msg) {
        return R.status(mqttClientTemplate.publish(topic, msg.getBytes(StandardCharsets.UTF_8)));
    }
}

接口测试

接口调用

控制台输出

到此这篇关于springboot整合mqtt实现消息订阅和推送的文章就介绍到这了,更多相关springboot整合mqtt内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文