java代码mqtt接收发送消息方式
作者:其妙的太空人
这篇文章主要介绍了java代码mqtt接收发送消息方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
java代码mqtt接收发送消息
mqtt消息第一用到不是太熟悉所以写一篇文章巩固一下。
前提是你已经把mqtt已经安装好,并且启动好了。
首先我们需要两部分代码。
所需依赖
<!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
连接mqtt部分的代码块,因为我不需要发送消息所以把发送消息给注释掉了。
package mqttclient.util; import lombok.extern.slf4j.Slf4j; import mqttclient.callback.MqttMessageCallback2; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Objects; @Component @Slf4j public class MqttClientUtil2 { private String username; private String password; @Value("tcp://127.0.0.1:1883")//这个是安装mqtt的ip以及端口,1883是mqtt默认端口 private String host; @Value("CYT")//这个随便写但是是唯一的。 private String clientId; @Value("cyt/#")这个是mqtt发送消息的咱们要订阅的topic,cyt/#代表以cyt/开始的所有topic都接收 private String topic; @Value("${mqtt.connection.timeout}")//IOT_MQTT_Yield会block住timeout的时间去尝试接收数据,直到timeout才会退出。可以写在这里也可以写在yml配置文件中 private int timeOut; @Value("${mqtt.keep.alive.interval}") private int interval; @Autowired private MqttMessageCallback2 mqttMessageCallback2; private MqttClient mqttClient; private MqttConnectOptions mqttConnectOptions; @PostConstruct private void init(){ connect(host, clientId,topic); } /** * 链接mqtt * @param host * @param clientId */ private void connect(String host,String clientId,String topic){ try{ mqttClient = new MqttClient(host,clientId,new MemoryPersistence()); mqttConnectOptions = getMqttConnectOptions(); //设置回调函数 mqttClient.setCallback(mqttMessageCallback2); //链接mqtt mqttClient.connect(mqttConnectOptions); //订阅消息 mqttClient.subscribe(topic,2); }catch (Exception e){ log.error("mqtt服务链接异常!"); e.printStackTrace(); } } /** * 设置链接对象信息 * setCleanSession true 断开链接即清楚会话 false 保留链接信息 离线还会继续发消息 * @return */ private MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); /*mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray());*/ mqttConnectOptions.setServerURIs(new String[]{host}); mqttConnectOptions.setKeepAliveInterval(interval); mqttConnectOptions.setConnectionTimeout(timeOut); mqttConnectOptions.setCleanSession(true); return mqttConnectOptions; } /** *mqtt链接状态 * @return */ private boolean isConnect(){ if(Objects.isNull(this.mqttClient)){ return false; } return mqttClient.isConnected(); } /** * 设置重连 * @throws Exception */ private void reConnect() throws Exception{ if(Objects.nonNull(this.mqttClient)){ log.info("mqtt 服务已重新链接..."); this.mqttClient.connect(this.mqttConnectOptions); } } /** * 断开链接 * @throws Exception */ private void closeConnect() throws Exception{ if(Objects.nonNull(this.mqttClient)){ log.info("mqtt 服务已断开链接..."); this.mqttClient.disconnect(); } } /* *//** * 发布消息 * @param topic * @param message * @param qos * @throws Exception *//* public void sendMessage(String topic,String message,int qos) throws Exception { if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(message.getBytes()); mqttMessage.setQos(qos); MqttTopic mqttTopic = mqttClient.getTopic(topic); if(Objects.nonNull(mqttTopic)){ try{ MqttDeliveryToken publish = mqttTopic.publish(mqttMessage); if(publish.isComplete()){ log.info("消息发送成功---->{}",message); } }catch(Exception e){ log.error("消息发送异常",e); } } }else{ reConnect(); } }*/ }
接收消息部分
package mqttclient.callback; import lombok.extern.slf4j.Slf4j; import mqttclient.util.ParsingData2; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; import java.util.List; @Component @Slf4j public class MqttMessageCallback2 implements MqttCallback { /** * 链接丢失时处理 * @param throwable */ @Override public void connectionLost(Throwable throwable) { //可以做重连 或者 其他业务处理 } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("接收到消息topic---->{}"+topic); System.out.println("接收到消息topic---->{}"+mqttMessage); log.info("接收到消息质量qos---->{}",mqttMessage.getQos()); System.out.println("接收到消息质量qos---->{}"+mqttMessage.getQos()); log.info("接收到消息具体信息---->{}",new String(mqttMessage.getPayload())); System.out.println("接收到消息具体信息---->{}"+mqttMessage.getPayload()); //结合业务 编写具体信息即可 } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }
这个两个写完之后只要有数据发送过来,这边会自动进行接收打印。
是用mqtt网页版图形化界面进行模拟数据发送。
安装mqtt后打开此网站:http://localhost:18083/
默认账号是:admin / public
登录后这边可以设置中文:
模拟发送:这几个地方不用改动但是一定要点击绿色的连接才可以,进行发送。
需要修改的部分是:
然后点击发送就可以收到信息了。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。