javascript技巧

关注公众号 jb51net

关闭
首页 > 网络编程 > JavaScript > javascript技巧 > js mqtt实时通信

基于JavaScript的MQTT实时通信应用开发实战记录

作者:写段代码给党看

MQTT是轻量级发布/订阅协议,专为物联网等低带宽场景设计,支持QoS分级、遗嘱消息等特性,本文通过JavaScript库实现客户端,涵盖连接管理、消息处理及优化方案,适用于实时通信应用开发,本文给大家介绍基于JavaScript的MQTT实时通信应用开发,感兴趣的朋友一起看看吧

MQTT 协议入门与实践:使用 JavaScript 构建实时通信应用

1. 什么是 MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的 发布/订阅(Pub-Sub) 消息协议,专为低带宽、高延迟或不稳定的网络环境设计。它广泛应用于物联网(IoT)、即时通讯、远程监控等场景。

核心特性:

2. MQTT 基础概念

术语说明
Broker消息代理服务器(如 Mosquitto、EMQX),负责转发消息。
Topic消息的分类标识(如 sensor/temperature),支持通配符 +#
QoS消息质量等级:
0 - 最多一次(可能丢失)
1 - 至少一次(可能重复)
2 - 恰好一次(可靠但开销大)
Client发布或订阅消息的设备或应用。

3. 实战:用 JavaScript 实现 MQTT 客户端

以下是一个基于 mqtt.js 库的封装类(代码来自提供的 mqttClient.js):

功能亮点:

  1. 自动重连机制
    this.client.on('reconnect', () => {
        if (++this.reconnectCount >= this.maxReconnectAttempts) {
            this.client.end(); // 超过最大重试次数后放弃
        }
    });
    
  2. Promise 封装连接
    connect() {
        return new Promise((resolve, reject) => {
            this.client.on('connect', resolve);
            this.client.on('error', reject);
        });
    }
    
  3. 安全的资源释放
    disconnect() {
        if (this.client?.connected) {
            this.client.end(); // 避免内存泄漏
        }
    }
    

使用示例:

const mqttClient = new MqttClient('mqtt://broker.emqx.io');
// 连接并订阅
mqttClient.connect().then(() => {
    mqttClient.subscribe('home/sensor', (topic, message) => {
        console.log(`[${topic}] ${message}`);
    });
    // 发布消息
    mqttClient.publish('home/light', 'ON');
});
// 页面卸载时断开连接
window.addEventListener('beforeunload', () => mqttClient.disconnect());

4. 常见问题与优化建议

❗ 问题 1:消息重复接收

原因:QoS 1 可能导致重复消息。
解决:在回调函数中实现幂等处理(如消息 ID 去重)。

❗ 问题 2:连接不稳定

优化

❗ 安全问题

5. 扩展应用场景

结语

MQTT 的轻量级和灵活性使其成为实时通信的理想选择。通过本文的封装类,你可以快速集成 MQTT 到你的 JavaScript 项目中。建议进一步探索:

提示:在浏览器中使用时,需注意跨域问题和 WebSocket 支持!

完整代码示例

/**
 * 
 * 调用方式如下
 * const mqttClient = new MqttClient('mqtt://your-broker-url');
 * mqttClient.connect();
 * mqttClient.subscribe('your/topic', (topic, message) => {
 * console.log(`${topic}: ${message}`);
 * });
 * 
 * 在页面销毁时断开连接,释放资源(防止骂娘)
 * mqttClient.disconnect();
 * */
   import mqtt from 'mqtt';
// 定义MqttClient类
class MqttClient {
	// 构造函数,接收broker的URL和客户端ID(可选)
	constructor(brokerUrl, clientId = `client-${Math.random().toString(16).substr(2, 8)}`, username = '', password = '') {
		this.brokerUrl = brokerUrl; // 存储broker的URL
		this.clientId = clientId; // 存储客户端ID,如果没有提供则生成一个随机的
		this.client = null; // 初始化mqtt客户端为null
		this.reconnectCount = 0; // 添加重连计数器
		this.maxReconnectAttempts = 5; // 设置最大重连次数
		this.username = username;
		this.password = password;
	}
	/**
	 * 连接到MQTT broker的方法,返回Promise
	 */
	connect() {
		return new Promise((resolve, reject) => {
			const options = {
				clientId: this.clientId,
				clean: true,
				connectTimeout: 4000,
				reconnectPeriod: 1000,
				username: this.username, // 添加用户名配置
				password: this.password // 添加密码配置
			};
			this.client = mqtt.connect(this.brokerUrl, options);
			this.client.on('connect', () => {
				console.log('M_connected');
				this.reconnectCount = 0; // 连接成功后重置计数器
				resolve(true);
			});
			this.client.on('error', (error) => {
				console.error('M_error:', error);
				reject(error);
			});
			this.client.on('close', () => {
				console.log('M_connection closed');
				reject(new Error('Connection closed'));
			});
			this.client.on('offline', () => {
				console.log('M_client offline');
				reject(new Error('Client offline'));
			});
			this.client.on('reconnect', () => {
				this.reconnectCount++;
				console.log(`M_reconnecting... (Attempt ${this.reconnectCount}/${this.maxReconnectAttempts})`);
				if (this.reconnectCount >= this.maxReconnectAttempts) {
					this.client.end(); // 达到最大重连次数后断开连接
					reject(new Error('Max reconnection attempts reached'));
				}
			});
		});
	}
	/**
	 * 订阅主题的方法,接收主题和回调函数作为参数
	 * @param {string} topic 
	 * @param {fun} callback 
	 */
	subscribe(topic, callback) {
		this.client.subscribe(topic, { qos: 1 }, (error) => { // 使用qos 1保证消息至少被传递一次,2 网络开销较大,不推荐用 0
			if (error) {
				console.error('Subscribe error:', error);
			} else {
				// 监听消息事件,当收到消息时调用回调函数,receivedTopic与订阅主题匹配的主题
				this.client.on('message', (receivedTopic, message) => {
					callback(receivedTopic, message.toString()); // 调用回调函数处理消息
				});
			}
		});
	}
	/**
	 * 发布消息到指定主题的方法,接收主题、消息和qos(可选)作为参数
	 * @param {string} topic 
	 * @param {string|Buffer} message 
	 * @param {number} qos 
	 */
	publish(topic, message, qos = 1) {
		this.client.publish(topic, message, { qos }, (error) => { // 使用指定的qos发布消息
			if (error) {
				console.error('Publish error:', error);
			} else {
				console.log(`Published message to topic: ${topic}`);
			}
		});
	}
	/**
	 * 断开与MQTT broker连接的方法
	 */
	disconnect() {
		// this.client.end();
		if (this.client?.connected) {
			this.client.end(); // 断开连接
		}
	}
}
export default MqttClient;

到此这篇关于基于JavaScript的MQTT实时通信应用开发实战记录的文章就介绍到这了,更多相关js mqtt实时通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文