java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > java代码mqtt接收发送消息

java代码mqtt接收发送消息方式

作者:其妙的太空人

这篇文章主要介绍了java代码mqtt接收发送消息方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

java代码mqtt接收发送消息

mqtt消息第一用到不是太熟悉所以写一篇文章巩固一下。

前提是你已经把mqtt已经安装好,并且启动好了。

首先我们需要两部分代码。

所需依赖

         <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

连接mqtt部分的代码块,因为我不需要发送消息所以把发送消息给注释掉了。

package mqttclient.util;
import lombok.extern.slf4j.Slf4j;
import mqttclient.callback.MqttMessageCallback2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects;
@Component
@Slf4j
public class MqttClientUtil2 {
    private String username;
    private String password;
    @Value("tcp://127.0.0.1:1883")//这个是安装mqtt的ip以及端口,1883是mqtt默认端口
    private String host;
    @Value("CYT")//这个随便写但是是唯一的。
    private String clientId;
    @Value("cyt/#")这个是mqtt发送消息的咱们要订阅的topic,cyt/#代表以cyt/开始的所有topic都接收
    private String topic;
    @Value("${mqtt.connection.timeout}")//IOT_MQTT_Yield会block住timeout的时间去尝试接收数据,直到timeout才会退出。可以写在这里也可以写在yml配置文件中
    private int timeOut;
    @Value("${mqtt.keep.alive.interval}")
    private int interval;
    @Autowired
    private MqttMessageCallback2 mqttMessageCallback2;
    private MqttClient mqttClient;
    private MqttConnectOptions mqttConnectOptions;
    @PostConstruct
    private void init(){
        connect(host, clientId,topic);
    }
    /**
     * 链接mqtt
     * @param host
     * @param clientId
     */
    private void connect(String host,String clientId,String topic){
        try{
            mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
            mqttConnectOptions = getMqttConnectOptions();
            //设置回调函数
            mqttClient.setCallback(mqttMessageCallback2);
            //链接mqtt
            mqttClient.connect(mqttConnectOptions);
            //订阅消息
            mqttClient.subscribe(topic,2);
        }catch (Exception e){
            log.error("mqtt服务链接异常!");
            e.printStackTrace();
        }
    }
    /**
     * 设置链接对象信息
     * setCleanSession  true 断开链接即清楚会话  false 保留链接信息 离线还会继续发消息
     * @return
     */
    private MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        /*mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());*/
        mqttConnectOptions.setServerURIs(new String[]{host});
        mqttConnectOptions.setKeepAliveInterval(interval);
        mqttConnectOptions.setConnectionTimeout(timeOut);
        mqttConnectOptions.setCleanSession(true);
        return mqttConnectOptions;
    }
    /**
     *mqtt链接状态
     * @return
     */
    private boolean isConnect(){
        if(Objects.isNull(this.mqttClient)){
            return false;
        }
        return mqttClient.isConnected();
    }
    /**
     * 设置重连
     * @throws Exception
     */
    private void reConnect() throws Exception{
        if(Objects.nonNull(this.mqttClient)){
            log.info("mqtt 服务已重新链接...");
            this.mqttClient.connect(this.mqttConnectOptions);
        }
    }
    /**
     * 断开链接
     * @throws Exception
     */
    private void closeConnect() throws Exception{
        if(Objects.nonNull(this.mqttClient)){
            log.info("mqtt 服务已断开链接...");
            this.mqttClient.disconnect();
        }
    }
/*    *//**
     * 发布消息
     * @param topic
     * @param message
     * @param qos
     * @throws Exception
     *//*
    public void sendMessage(String topic,String message,int qos) throws Exception {
        if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(message.getBytes());
            mqttMessage.setQos(qos);
            MqttTopic mqttTopic = mqttClient.getTopic(topic);
            if(Objects.nonNull(mqttTopic)){
                try{
                    MqttDeliveryToken publish = mqttTopic.publish(mqttMessage);
                    if(publish.isComplete()){
                        log.info("消息发送成功---->{}",message);
                    }
                }catch(Exception e){
                    log.error("消息发送异常",e);
                }
            }
        }else{
            reConnect();
        }
    }*/
}

接收消息部分

package mqttclient.callback;
import lombok.extern.slf4j.Slf4j;
import mqttclient.util.ParsingData2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class MqttMessageCallback2 implements MqttCallback {
    /**
     * 链接丢失时处理
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        //可以做重连 或者 其他业务处理
    }
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		System.out.println("接收到消息topic---->{}"+topic);
		System.out.println("接收到消息topic---->{}"+mqttMessage);
        log.info("接收到消息质量qos---->{}",mqttMessage.getQos());
		System.out.println("接收到消息质量qos---->{}"+mqttMessage.getQos());
        log.info("接收到消息具体信息---->{}",new String(mqttMessage.getPayload()));
		System.out.println("接收到消息具体信息---->{}"+mqttMessage.getPayload());
        //结合业务 编写具体信息即可
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}

这个两个写完之后只要有数据发送过来,这边会自动进行接收打印。

是用mqtt网页版图形化界面进行模拟数据发送。

安装mqtt后打开此网站:http://localhost:18083/

默认账号是:admin / public

登录后这边可以设置中文:

模拟发送:这几个地方不用改动但是一定要点击绿色的连接才可以,进行发送。

需要修改的部分是:

然后点击发送就可以收到信息了。 

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文