Python使用WebSocket和SSE实现HTTP服务器消息推送方式
作者:slp_44777680
很多时候我们需要实时获取最新数据,但是传统意义上的HTTP请求,必须由客户端向服务端发起请求,服务端再返回相应的数据。
那如果我们需要获取实时数据,就要通过HTTP轮询,客户端不间断的向服务器发起请求。
这样不断的的请求不但严重加大服务器的压力,还可能因为网络延迟而影响数据的时效性。
下面介绍两种方法能够很好的满足业务的需求。
一、WebSocket
WebSocket是HTML5开始提供的一种在单个 TCP 链接上进行全双工通信的协议。
- 优点:双工通信
- 缺点:需专门定义数据协议,解析数据流,且部分服务器支持不完善,后台例如java spring boot 2.1.2 仅支持websocket 1.0(最高已达1.3)
1.客户端代码
python 3+ 代码
#python 3+ import threading import websocket class Client: def __init__(self,url): self.url = url self.ws = None self.enable = True def on_message(self,response): self.enable = False print(response) def on_error(self,error): # print(ws) print(error) def on_close(self): self.enable = True print(ws) def on_open(self): print('open') def start_func(self): while self.enable: websocket.enableTrace(True) self.ws = websocket.WebSocketApp(self.url, on_message=self.on_message, # on_data=on_data, on_error=self.on_error, on_open=self.on_open, on_close=self.on_close, ) self.ws.run_forever(ping_interval=60, ping_timeout=5) if __name__ == "__main__": cli = Client(url = 'wss://api.zb.live/websocket' ) t1 = threading.Thread(target=cli.start_func_zb) t1.start()
javascript 代码
var ws = new WebSocket("wss://echo.websocket.org"); ws.onopen = function(evt) { console.log("Connection open ..."); ws.send("Hello WebSockets!"); }; ws.onmessage = function(evt) { console.log( "Received Message: " + evt.data); ws.close(); }; ws.onclose = function(evt) { console.log("Connection closed."); };
2.服务端代码
from websocket_server import WebsocketServer class WSSocketObj: def __init__(self,host=None,port = 8131): self.host = host if host else '127.0.0.1' self.port = port # 当新的客户端连接时会提示 def new_client(self,client, server,): print("当新的客户端连接时会提示:%s" % client['id']) dd = 122 server.send_message_to_all("Hey all, a new client has joined us") # 当旧的客户端离开 def client_left(self,client, server): print("客户端%s断开" % client['id']) # 接收客户端的信息。 def message_received(self,client, server, message): print("Client(%d) said: %s" % (client['id'], message)) # server.send_message_to_all(message) #发送消息给 全部客户端 server.send_message(client, 'hello,client') # 发送消息给指定客户端 def run_server(self): server = WebsocketServer(self.port, self.host) server.set_fn_new_client(self.new_client) server.set_fn_client_left(self.client_left) server.set_fn_message_received(self.message_received) server.run_forever() if __name__ == '__main__': WSSocketObj().run_server()
二、SSE(Server-Sent Events,服务器发送事件)
SSE ( Server-sent Events )通俗解释起来就是一种基于HTTP的,以流的形式由服务端持续向客户端发送数据的技术,是 WebSocket 的一种轻量代替方案。
- 优点:开发简单,和传统的http开发几乎无任何差别,客户端开发简单,有标准支持(EventSource)
- 缺点:和websocket相比,只能单工通信,建立连接后,只能由服务端发往客户端,且占用一个连接,如需客户端向服务端通信,需额外打开一个连接
1.客户端代码
python
# 第一种方式 def sse_client(): """ pip install sseclient-py 只对于返回标准SSE格式的请求可用 格式:event: {EVENT}\nretry: 10000\ndata: {DATA}\n\n :return: """ import requests # res = requests.request('get', url, json=data, stream=True, headers={'Accept': 'text/event-stream'}) client = requests.post(url, json=data, stream=True, headers={'Accept': 'text/event-stream'}) client = sseclient.SSEClient(client) for i in client.events(): print(i.data) # 第二种方式 def sse_with_requests(): headers = {"Accept": "text/event-stream"} r = requests.post(url, headers=headers, json=data, stream=True) r.encoding = 'utf-8' for chunk in r.iter_content(chunk_size=None, decode_unicode=True): # 处理接收到的数据块 print("Received:", chunk)
javascript
第一种方式:
//判断是否支持SSE if('EventSource' in window){ //初始化SSE var url="http:localhost:8000/stream"; var source=new EventSource(url); // 连接成功后会触发 open 事件 source.onopen=(event)=>{ console.log("开启SSE"); } // 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件 source.onmessage=(event)=>{ var data=event.data; $("body").append($("<p>").text(data)); } //监听like事件 source.addEventListener('like',function(event){ var data=event.data; $("body").append($("<p>").text(data)); },false); // 连接异常时会触发 error 事件并自动重连 source.onerror=(event)=>{ console.log(event); }
第二种方式:使用 addEventListener 方法来添加相应的事件处理方法
if (window.EventSource) { // 创建 EventSource 对象连接服务器 const source = new EventSource('http://localhost:2000'); // 连接成功后会触发 open 事件 source.addEventListener('open', () => { console.log('Connected'); }, false); // 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件 source.addEventListener('message', e => { console.log(`data: ${e.data}`); }, false); // 自定义 EventHandler,在收到 event 字段为 slide 的消息时触发 source.addEventListener('slide', e => { console.log(`data: ${e.data}`); // => data: 7 }, false); // 连接异常时会触发 error 事件并自动重连 source.addEventListener('error', e => { if (e.target.readyState === EventSource.CLOSED) { console.log('Disconnected'); } else if (e.target.readyState === EventSource.CONNECTING) { console.log('Connecting...'); } }, false); } else { console.error('Your browser doesn\'t support SSE'); }
EventSource从父接口 EventTarget
中继承了属性和方法,其内置了 3 个 EventHandler
属性、2 个只读属性和 1 个方法:
EventHandler 属性
EventSource.onopen
在连接打开时被调用。EventSource.onmessage
在收到一个没有 event 属性的消息时被调用。EventSource.onerror
在连接异常时被调用。 只读属性EventSource.readyState
一个 unsigned short 值,代表连接状态。可能值是 CONNECTING (0), OPEN (1), 或者 CLOSED (2)。EventSource.url
连接的 URL。 方法EventSource.close()
关闭连接
EventSource 对象的 onmessage
属性的作用类似于 addEventListener( ‘ message ’ )
2.服务端代码(基于Flask)
import json import time from flask import Flask, request from flask import Response from flask import render_template app = Flask(__name__) def get_message(): """this could be any function that blocks until data is ready""" time.sleep(1) s = time.ctime(time.time()) return json.dumps(['当前时间:' + s , 'a'], ensure_ascii=False) @app.route('/') def hello_world(): return render_template('index.html') @app.route('/stream') def stream(): user_id = request.args.get('user_id') print(user_id) def eventStream(): id = 0 while True: id +=1 # wait for source data to be available, then push it yield 'id: {}\nevent: add\ndata: {}\n\n'.format(id,get_message()) return Response(eventStream(), mimetype="text/event-stream") if __name__ == '__main__': app.run()
因为SSE是http请求,可是又限定是一个长链接,因此要设置MIME类型为text/event-stream。返回的为字符串。
消息的格式
服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本;
每一次发送的信息,由若干个message组成,每一个message之间用\n\n分隔。每一个message内部由若干行组成
- 格式
[field]:value\n
其中在规范中为消息定义了 4 个字段
- id 表明id
- event 表明消息的事件类型
- data 消息的数据字段
- retry 客户端重连的时间。只接受整数,单位是毫秒。如果这个值不是整数则会被自动忽略
需要注意的是,id字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 Last-Event-ID这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。
一个很有意思的地方是,规范中规定以冒号开头的消息都会被当作注释,一条普通的注释(:\n\n
)对于服务器来说只占 5个字符,但是发送到客户端上的时候不会触发任何事件,这对客户端来说是非常友好的。所以注释一般被用于维持服务器和客户端的长连接。
3.SSE使用注意事项
1、SSE 如何保证数据完整性
客户端在每次接收到消息时,会把消息的 id 字段作为内部属性 Last-Event-ID 储存起来。
SSE 默认支持断线重连机制,在连接断开时会 触发 EventSource 的 error 事件,同时自动重连。再次连接成功时EventSource 会把 Last-Event-ID 属性作为请求头发送给服务器,这样服务器就可以根据这个 Last-Event-ID作出相应的处理。
这里需要注意的是,id 字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 Last-Event-ID这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。
2、减少开销
在 SSE 的草案中提到,“text/event-stream” 的 MIME 类型传输应当在静置 15秒后自动断开。在实际的项目中也会有这个机制,但是断开的时间没有被列入标准中。
为了减少服务器的开销,我们也可以有目的的断开和重连。
简单的办法是服务器发送一个 关闭消息并指定一个重连的时间戳,客户端在触发关闭事件时关闭当前连接并创建 一个计时器,在重连时把计时器销毁。
function connectSSE() { if (window.EventSource) { const source = new EventSource('http://localhost:2000'); let reconnectTimeout; source.addEventListener('open', () => { console.log('Connected'); clearTimeout(reconnectTimeout); }, false); source.addEventListener('pause', e => { source.close(); const reconnectTime = +e.data; const currentTime = +new Date(); reconnectTimeout = setTimeout(() => { connectSSE(); }, reconnectTime - currentTime); }, false); } else { console.error('Your browser doesn\'t support SSE'); } } connectSSE();
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。