java代码mqtt接收发送消息方式
作者:其妙的太空人
这篇文章主要介绍了java代码mqtt接收发送消息方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
java代码mqtt接收发送消息
mqtt消息第一用到不是太熟悉所以写一篇文章巩固一下。
前提是你已经把mqtt已经安装好,并且启动好了。
首先我们需要两部分代码。
所需依赖
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>连接mqtt部分的代码块,因为我不需要发送消息所以把发送消息给注释掉了。
package mqttclient.util;
import lombok.extern.slf4j.Slf4j;
import mqttclient.callback.MqttMessageCallback2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects;
@Component
@Slf4j
public class MqttClientUtil2 {
private String username;
private String password;
@Value("tcp://127.0.0.1:1883")//这个是安装mqtt的ip以及端口,1883是mqtt默认端口
private String host;
@Value("CYT")//这个随便写但是是唯一的。
private String clientId;
@Value("cyt/#")这个是mqtt发送消息的咱们要订阅的topic,cyt/#代表以cyt/开始的所有topic都接收
private String topic;
@Value("${mqtt.connection.timeout}")//IOT_MQTT_Yield会block住timeout的时间去尝试接收数据,直到timeout才会退出。可以写在这里也可以写在yml配置文件中
private int timeOut;
@Value("${mqtt.keep.alive.interval}")
private int interval;
@Autowired
private MqttMessageCallback2 mqttMessageCallback2;
private MqttClient mqttClient;
private MqttConnectOptions mqttConnectOptions;
@PostConstruct
private void init(){
connect(host, clientId,topic);
}
/**
* 链接mqtt
* @param host
* @param clientId
*/
private void connect(String host,String clientId,String topic){
try{
mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
mqttConnectOptions = getMqttConnectOptions();
//设置回调函数
mqttClient.setCallback(mqttMessageCallback2);
//链接mqtt
mqttClient.connect(mqttConnectOptions);
//订阅消息
mqttClient.subscribe(topic,2);
}catch (Exception e){
log.error("mqtt服务链接异常!");
e.printStackTrace();
}
}
/**
* 设置链接对象信息
* setCleanSession true 断开链接即清楚会话 false 保留链接信息 离线还会继续发消息
* @return
*/
private MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());*/
mqttConnectOptions.setServerURIs(new String[]{host});
mqttConnectOptions.setKeepAliveInterval(interval);
mqttConnectOptions.setConnectionTimeout(timeOut);
mqttConnectOptions.setCleanSession(true);
return mqttConnectOptions;
}
/**
*mqtt链接状态
* @return
*/
private boolean isConnect(){
if(Objects.isNull(this.mqttClient)){
return false;
}
return mqttClient.isConnected();
}
/**
* 设置重连
* @throws Exception
*/
private void reConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服务已重新链接...");
this.mqttClient.connect(this.mqttConnectOptions);
}
}
/**
* 断开链接
* @throws Exception
*/
private void closeConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服务已断开链接...");
this.mqttClient.disconnect();
}
}
/* *//**
* 发布消息
* @param topic
* @param message
* @param qos
* @throws Exception
*//*
public void sendMessage(String topic,String message,int qos) throws Exception {
if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
MqttTopic mqttTopic = mqttClient.getTopic(topic);
if(Objects.nonNull(mqttTopic)){
try{
MqttDeliveryToken publish = mqttTopic.publish(mqttMessage);
if(publish.isComplete()){
log.info("消息发送成功---->{}",message);
}
}catch(Exception e){
log.error("消息发送异常",e);
}
}
}else{
reConnect();
}
}*/
}接收消息部分
package mqttclient.callback;
import lombok.extern.slf4j.Slf4j;
import mqttclient.util.ParsingData2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class MqttMessageCallback2 implements MqttCallback {
/**
* 链接丢失时处理
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
//可以做重连 或者 其他业务处理
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("接收到消息topic---->{}"+topic);
System.out.println("接收到消息topic---->{}"+mqttMessage);
log.info("接收到消息质量qos---->{}",mqttMessage.getQos());
System.out.println("接收到消息质量qos---->{}"+mqttMessage.getQos());
log.info("接收到消息具体信息---->{}",new String(mqttMessage.getPayload()));
System.out.println("接收到消息具体信息---->{}"+mqttMessage.getPayload());
//结合业务 编写具体信息即可
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}这个两个写完之后只要有数据发送过来,这边会自动进行接收打印。
是用mqtt网页版图形化界面进行模拟数据发送。
安装mqtt后打开此网站:http://localhost:18083/
默认账号是:admin / public
登录后这边可以设置中文:

模拟发送:这几个地方不用改动但是一定要点击绿色的连接才可以,进行发送。

需要修改的部分是:

然后点击发送就可以收到信息了。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
