WebSocket中心跳检测与断开重连机制详解
作者:hktk_wb
心跳检测是指在WebSocket连接过程中定时向服务端发送和接收心跳消息,来确定当前连接是否是正常状态的检测机制,断开重连是指在WebSocket不正常断开连接后,进行重新连接的策略,下面我们就来看看这二者的具体实现吧
心跳检测
在长时间WebSocket
连接过程中,容易因为网络或其他原因导致WebSocket
断开而没有触发ws.onclose
事件,导致无法获取WebSocket
连接失败的状态。
利用心跳检测可以预防这一问题。心跳检测是指在WebSocket
连接过程中定时向服务端发送和接收心跳消息,来确定当前连接是否是正常状态的检测机制。
断开重连
断开重连是指在WebSocket
不正常断开连接后,进行重新连接的策略。
在WebSocket
连接断开后,立马进行重新连接。如果连接失败,则间隔一定时间后再尝试重连,直到连接成功或者达到最大重连次数。
消息订阅
在WebSocket
使用过程中,可能会在不同的页面或逻辑中需要订阅全部或部分WebSocket
消息,利用消息订阅可以让使用者在不同页面中订阅获取相关的消息信息。
代码实现
定义WebSocket内部参数
this.url = url //WebSocket连接地址 this.ws = null //WebSocket连接对象 this.heartEnabled = heartEnabled //是否开启心跳 this.heartInterval = 10000 //心跳间隔时间 this.heartTimeout = 5000 //心跳超时时间 this.lockReconnect = false //是否禁止重连 this.heartTimer = null //心跳定时器 this.serverTimer = null //服务器超时定时器 this.reconnectCount = 0 //重连次数 this.maxReconnectCount = 5 //最大重连次数 this.observers = [] //消息订阅者列表 this.waitingMessages = [] //待执行命令列表
实现WebSocket连接
//WebSocket连接 connect() { this.ws = new WebSocket(this.url) this.ws.onopen = () => { this.reconnectCount = 0 // 重置重连次数 this.heartEnabled && this.start() // 开启心跳 // 发送所有等待发送的信息 const length = this.waitingMessages.length for (let i = 0; i < length; ++i) { const message = this.waitingMessages.shift() this.send(message) } } this.ws.onclose = (event) => { console.log('WebSocket closed:', event) this.reconnect() //这里可根据具体情况判定是否重新连接 } this.ws.onerror = (error) => { console.log('WebSocket error:', error) this.reconnect() } this.ws.onmessage = (event) => { //收到心跳信息则重置心跳,收到其他信息则触发回调 if (event.data === 'pong') { this.start() } else { this.observers.forEach((observer) => { //如需要,这里可根据observer的订阅type来判断是否回调 observer.callback(event.data) }) } } }
消息订阅及发送
//发送信息 send(message) { //发送信息时若WebSocket还未连接,则将信息放入待发送信息中等待连接成功后发送 if (this.onReady() !== WebSocket.OPEN) { this.waitingMessages.push(message) return this } this.ws.send(message) return this } //订阅WebSocket信息 onObserve(callback, type = 'all') { const observer = { type, callback } this.observers.push(observer) return observer } //取消订阅信息 cancelObserve(cancelObserver) { this.observers.forEach((observer, index) => { if (cancelObserver === observer) { this.observers.splice(index, 1) } }) } //WebSocket连接状态 onReady() { return this.ws.readyState }
心跳检测
//开启心跳 start() { this.reset() this.heartTimer = setTimeout(() => { this.send('ping') //达到心跳超时时间还没有返回心跳信息,则认为连接断开,关闭WebSocket并重连 this.serverTimer = setTimeout(() => { this.ws.close() }, this.heartTimeout) }, this.heartInterval) } //重置心跳定时器/服务超时定时器 reset() { this.heartTimer && clearTimeout(this.heartTimer) this.serverTimer && clearTimeout(this.serverTimer) }
断开重连
//重连 reconnect() { // 设置lockReconnect变量避免重复连接 if (this.lockReconnect || this.reconnectCount >= this.maxReconnectCount) return this.lockReconnect = true this.reconnectCount++ //重连次数+1 setTimeout(() => { this.connect() this.lockReconnect = false }, 1000 * this.reconnectCount) //重连次数越多,延时越久 }
完整代码 js
class SocketConnect { constructor(url = 'ws://127.0.0.1:8080', heartEnabled = true) { this.url = url //WebSocket连接地址 this.ws = null //WebSocket连接对象 this.heartEnabled = heartEnabled //是否开启心跳 this.heartInterval = 10000 //心跳间隔时间 this.heartTimeout = 5000 //心跳超时时间 this.lockReconnect = false //是否禁止重连 this.heartTimer = null //心跳定时器 this.serverTimer = null //服务器超时定时器 this.reconnectCount = 0 //重连次数 this.maxReconnectCount = 5 //最大重连次数 this.observers = [] //消息订阅者列表 this.waitingMessages = [] //待执行命令列表 this.connect() } //WebSocket连接 connect() { this.ws = new WebSocket(this.url) this.ws.onopen = () => { this.reconnectCount = 0 // 重置重连次数 this.heartEnabled && this.start() // 开启心跳 // 发送所有等待发送的信息 const length = this.waitingMessages.length for (let i = 0; i < length; ++i) { const message = this.waitingMessages.shift() this.send(message) } } this.ws.onclose = (event) => { console.log('WebSocket closed:', event) this.reconnect() } this.ws.onerror = (error) => { console.log('WebSocket error:', error) this.reconnect() } this.ws.onmessage = (event) => { //收到心跳信息则重置心跳,收到其他信息则触发回调 if (event.data === 'pong') { this.start() } else { this.observers.forEach((observer) => { observer.callback(event.data) }) } } } //发送信息 send(message) { //发送信息时若WebSocket还未连接,则将信息放入待发送信息中等待连接成功后发送 if (this.onReady() !== WebSocket.OPEN) { this.waitingMessages.push(message) return this } this.ws.send(message) return this } //订阅webSocket信息 onObserve(callback, type = 'all') { const observer = { type, callback } this.observers.push(observer) return observer } //取消订阅信息 cancelObserve(cancelObserver) { this.observers.forEach((observer, index) => { if (cancelObserver === observer) { this.observers.splice(index, 1) } }) } //开启心跳 start() { this.reset() this.heartTimer = setTimeout(() => { this.send('ping') //5秒钟还没有返回心跳信息,则认为连接断开,关闭WebSocket并重连 this.serverTimer = setTimeout(() => { this.ws.close() }, this.heartTimeout) }, this.heartInterval) } //重置心跳定时器/服务超时定时器 reset() { this.heartTimer && clearTimeout(this.heartTimer) this.serverTimer && clearTimeout(this.serverTimer) } //重连 reconnect() { // 设置lockReconnect变量避免重复连接 if (this.lockReconnect || this.reconnectCount >= this.maxReconnectCount) return this.lockReconnect = true this.reconnectCount++ //重连次数+1 setTimeout(() => { this.connect() this.lockReconnect = false }, 1000 * this.reconnectCount) //重连次数越多,延时越久 } //WebSocket连接状态 onReady() { return this.ws.readyState } } export default SocketConnect
使用示例
// WebSocket连接 const url = 'ws://127.0.0.1:8080' const ws = new SocketConnect(url) // 消息订阅 const observer = ws.onObserve((data) => { console.log('data:', data) }) // 取消订阅 ws.cancelObserve(observer) // 发送消息 ws.send('hello world')
完整代码 ts
type ObserverType = { type: string callback: (event: MessageEvent) => void } type MessageDataType = object class SocketConnect { private url: string private ws: WebSocket | undefined //WebSocket实例 private heartEnabled: boolean //是否开启心跳 private heartInterval = 10000 //心跳间隔时间 private heartTimeout = 5000 //心跳超时时间 private lockReconnect = false //是否禁止重连 private heartTimer: NodeJS.Timeout | undefined //心跳定时器 private serverTimer: NodeJS.Timeout | undefined //服务器超时定时器 private reconnectCount = 0 //重连次数 private maxReconnectCount = 5 //最大重连次数 private observers: ObserverType[] = [] //消息订阅者列表 private waitingMessages: string[] = [] //待执行命令列表 constructor(url = 'ws://127.0.0.1:8080', heartEnabled = false) { this.url = url this.heartEnabled = heartEnabled this.connect() } //WebSocket连接 connect() { this.ws = new WebSocket(this.url) this.ws.onopen = () => { this.reconnectCount = 0 // 重置重连次数 this.heartEnabled && this.start() // 开启心跳 // 发送所有等待发送的信息 const length = this.waitingMessages.length for (let i = 0; i < length; ++i) { const message = this.waitingMessages.shift() this.send(message) } } this.ws.onclose = (event) => { console.log('WebSocket closed:', event) this.reconnect() } this.ws.onerror = (error) => { console.log('WebSocket error:', error) this.reconnect() } this.ws.onmessage = (event: MessageEvent) => { //收到心跳信息则重置心跳,收到其他信息则触发回调 if (event.data === 'pong') { this.start() } else { this.observers.forEach((observer) => { observer.callback(event.data) }) } } } //发送信息 send(message?: string) { if (message) { //发送信息时若WebSocket还未连接,则将信息放入待发送信息中等待连接成功后发送 if (this.onReady() !== WebSocket.OPEN) { this.waitingMessages.push(message) return this } this.ws && this.ws.send(message) } return this } //订阅WebSocket信息 onObserve(callback: (data: MessageDataType) => void, type = 'all') { const observer = { type, callback } this.observers.push(observer) return observer } //取消订阅信息 cancelObserve(cancelObserver: ObserverType) { this.observers.forEach((observer, index) => { if (cancelObserver === observer) { this.observers.splice(index, 1) } }) } //开启心跳 private start() { this.reset() this.heartTimer = setTimeout(() => { this.send('ping') //5秒钟还没有返回心跳信息,则认为连接断开,关闭WebSocket并重连 this.serverTimer = setTimeout(() => { this.ws && this.ws.close() }, this.heartTimeout) }, this.heartInterval) } //重连 private reconnect() { // 设置lockReconnect变量避免重复连接 if (this.lockReconnect || this.reconnectCount >= this.maxReconnectCount) return this.lockReconnect = true this.reconnectCount++ //重连次数+1 setTimeout(() => { this.connect() this.lockReconnect = false }, 1000 * this.reconnectCount) //重连次数越多,延时越久 } // 重置心跳定时器/服务超时定时器 private reset() { this.heartTimer && clearTimeout(this.heartTimer) this.serverTimer && clearTimeout(this.serverTimer) } // WebSocket连接状态 onReady() { return this.ws && this.ws.readyState } } export default SocketConnect
以上就是WebSocket中心跳检测与断开重连机制详解的详细内容,更多关于WebSocket心跳检测与断开重连的资料请关注脚本之家其它相关文章!