Python 实现 WebSocket 通信的过程详解
作者:微软技术分享
WebSocket是一种在Web应用程序中实现双向通信的协议。与传统的HTTP请求-响应模型不同,WebSocket允许服务器主动向客户端推送数据,实现实时性和互动性。
以下是WebSocket的一些关键特点:
双向通信:WebSocket提供了全双工通信,允许服务器和客户端同时发送和接收数据。相比起传统的HTTP请求-响应模型,WebSocket使得服务器可以主动推送数据给客户端,而不需要客户端首先发起请求。
持久连接:与HTTP请求不同,WebSocket连接是持久的,即一旦建立连接,它将保持打开状态,直到其中一方关闭连接或发生错误。
轻量级协议:WebSocket使用简单的消息传递协议,在传输数据时减少了额外的开销。它采用二进制帧格式或文本帧格式来发送数据。
跨域支持:WebSocket支持跨域通信,允许在不同域名或端口之间建立连接和交换数据。
WebSocket在许多场景下非常有用,特别是实时通信和实时数据更新方面。它被广泛应用于在线聊天、实时游戏、股票市场行情、协同编辑和实时协作等应用程序中。
在编程中,可以使用WebSocket API来建立WebSocket连接,并使用WebSocket的事件和方法来处理连接的打开、关闭、错误和消息等操作。常见的编程语言和框架提供了对WebSocket的支持,使得开发者可以方便地实现WebSocket功能。
实现简单通信
这段代码实现了一个简单的WebSocket客户端,通过与服务器建立连接,可以实现实时的双向通信。需要注意的是,连接的URL和端口需要根据实际情况进行修改,确保与服务器端的WebSocket服务端口一致。
<!DOCTYPE html> <html> <head lang="en"> <meta charset="UTF-8"> <script src="https://cdn.lyshark.com/jquery/3.5.1/jquery.min.js"></script> </head> <body> <ul id="content"></ul> <form class="form"> <input type="text" placeholder="请输入发送的消息" class="message" id="message"/> <input type="button" value="连接" id="connect" class="connect"/> <input type="button" value="发送" id="send" class="connect"/> </form> <script type="text/javascript"> var oUl=document.getElementById('content'); var oConnect=document.getElementById('connect'); var oSend=document.getElementById('send'); var websocket=null; oConnect.onclick=function(){ websocket=new WebSocket('ws://127.0.0.1:10083'); <!--客户端链接后触发--> websocket.onopen=function(){ oUl.innerHTML+="<li>客户端已连接</li>"; } <!--收到消息后触发--> websocket.onmessage=function(evt){ oUl.innerHTML+="<li>"+evt.data+"</li>"; } <!--关闭后触发--> websocket.onclose=function(){ oUl.innerHTML+="<li>客户端已断开连接</li>"; }; <!--出错后触发--> websocket.onerror=function(evt){ oUl.innerHTML+="<li>"+evt.data+"</li>"; }; }; oSend.onclick=function(){ if(websocket){ websocket.send($("#message").val()) } } </script> </body> </html>
后端的main.py执行处理任务,主要处理流程集中在handler_msg函数上,如下这段代码实现了一个简单的WebSocket服务器。它使用了Python的socket和threading模块来创建服务器,并处理与客户端的握手和数据交互。
代码中的主要函数和功能包括:
get_headers(data)
: 从请求数据中获取请求头部信息,并将其转换为字典格式。
parse_payload(payload)
: 对接收到的数据进行解码,还原出原始消息。
send_msg(conn, msg_bytes)
: 封装并发送数据到浏览器。
recv_msg(conn)
: 从浏览器中接收数据,并解析出原始消息。
handler_accept(sock)
: 建立握手流程,处理与客户端的连接和握手过程。在握手完成后,创建一个新的线程来处理与客户端的数据交互。
handler_msg(connect)
: 处理与客户端的数据交互。通过循环不断接收客户端发送的消息,并发送一个固定的回复消息。
主函数部分:创建一个socket对象并绑定到本地主机的端口10083上,然后开始监听客户端的连接请求。当有新的连接请求时,将其分配给handler_accept()
函数进行处理,并在新线程中处理与客户端的数据交互。
该代码实现了一个简单的WebSocket服务器,可以接收来自客户端的连接请求,并与客户端进行数据交互。需要注意的是,服务器绑定的IP地址和端口号可以根据实际需求进行修改。
import socket,struct,hashlib,base64 import threading # 获取请求头部数据,并将请求头转换为字典 def get_headers(data): headers = {} data = str(data, encoding="utf-8") header, body = data.split("\r\n\r\n", 1) header_list = header.split("\r\n") for i in header_list: i_list = i.split(":", 1) if len(i_list) >= 2: headers[i_list[0]] = "".join(i_list[1::]).strip() else: i_list = i.split(" ", 1) if i_list and len(i_list) == 2: headers["method"] = i_list[0] headers["protocol"] = i_list[1] print("请求类型: {} 请求协议: {}".format(i_list[0],i_list[1])) return headers # 接收数据时的解码过程 def parse_payload(payload): payload_len = payload[1] & 127 if payload_len == 126: mask = payload[4:8] decoded = payload[8:] elif payload_len == 127: mask = payload[10:14] decoded = payload[14:] else: mask = payload[2:6] decoded = payload[6:] # 将所有数据全部收集起来,对所有字符串编码 bytes_list = bytearray() for i in range(len(decoded)): chunk = decoded[i] ^ mask[i % 4] bytes_list.append(chunk) body = str(bytes_list, encoding='utf-8') return body # 封装并发送数据到浏览器 def send_msg(conn, msg_bytes): # 接收的第一个字节都是x81不变 first_byte = b"\x81" length = len(msg_bytes) if length < 126: first_byte += struct.pack("B", length) elif length <= 0xFFFF: first_byte += struct.pack("!BH", 126, length) else: first_byte += struct.pack("!BQ", 127, length) msg = first_byte + msg_bytes conn.sendall(msg) return True # 从浏览器中接收数据 def recv_msg(conn): data_recv = conn.recv(8096) if data_recv[0:1] == b"\x81": data_parse = parse_payload(data_recv) return data_parse return False # 建立握手流程并创建 handler_msg 完成数据收发 def handler_accept(sock): while True: conn, addr = sock.accept() data = conn.recv(8096) headers = get_headers(data) # 对请求头中的sec-websocket-key进行加密 response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \ "Upgrade:websocket\r\n" \ "Connection: Upgrade\r\n" \ "Sec-WebSocket-Accept: %s\r\n" \ "WebSocket-Location: ws://%s\r\n\r\n" # 加盐操作,此处是H5规范定义好的 magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' if headers.get('Sec-WebSocket-Key'): value = headers['Sec-WebSocket-Key'] + magic_string # 对数据进行加解密 ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest()) response_str = response_tpl % (ac.decode('utf-8'), headers.get("Host")) # 相应握手包数据 conn.sendall(bytes(response_str, encoding="utf-8")) t = threading.Thread(target=handler_msg, args=(conn, )) t.start() # 主函数,用于实现数据交互 def handler_msg(connect): with connect as connect_ptr: while True: try: recv = recv_msg(connect_ptr) print("接收数据: {}".format(recv)) send_msg(connect_ptr, bytes("hello lyshark", encoding="utf-8")) except Exception: exit(0) if __name__ == "__main__": sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("127.0.0.1", 10083)) sock.listen(5) t = threading.Thread(target=handler_accept(sock)) t.start()
绘制动态图形
如下这段代码实现了一个简单的实时CPU数据展示页面。通过WebSocket与服务器建立连接,并实时接收和展示来自服务器的CPU数据。需要注意的是,页面中WebSocket连接的IP地址和端口号需要根据实际情况进行修改,确保与服务器建立正确的连接。另外,使用ECharts库来绘制折线图需要保证页面能够正确加载ECharts的JavaScript文件。
代码中的主要部分包括:
引入ECharts库:使用<script>
标签引入ECharts库的JavaScript文件,以便在页面中使用ECharts。
HTML结构:在页面中创建一个<div>
元素,用于显示折线图。
JavaScript代码:定义了一个名为display
的函数,用于初始化和更新折线图。该函数接收两个参数:时间数组和CPU数据数组。通过调用ECharts的setOption
方法,配置折线图的样式和数据,并将其显示在指定的<div>
元素中。
WebSocket通信:创建一个WebSocket对象,与服务器建立WebSocket连接。通过监听onmessage
事件,接收服务器发送的CPU数据。将接收到的数据解析为JSON格式,将时间和CPU数据分别添加到对应的数组中。当时间数组和CPU数据数组的长度达到10时,删除数组中最旧的数据,并调用display
函数更新折线图。
<!DOCTYPE html> <html> <head lang="en"> <meta charset="UTF-8"> <script type="text/javascript" src="https://cdn.lyshark.com/echarts/5.0.0/echarts.min.js"></script> </head> <body> <center><div id="main" style="height:400px;width:90%;border:1px solid #eecc11;padding:10px;"></div></center> <script type="text/javascript" charset="UTF-8"> var display = function(time,cpu) { var main = echarts.init(document.getElementById(("main"))); var option = { xAxis: { type: 'category', data: time }, yAxis: { type: 'value' }, series: [{ type: 'line', smooth:0.3, symbol: 'none', color: 'blue', smooth: true, areaStyle: { color: '#0000CD', origin: 'start', opacity: 0.5 }, data: cpu }] }; main.setOption(option,true); }; </script> <script type="text/javascript" charset="UTF-8"> var ws=new WebSocket('ws://127.0.0.1:10083'); var time =["","","","","","","","","",""]; var cpu = [0,0,0,0,0,0,0,0,0,0]; ws.onmessage=function(evt) { var recv = JSON.parse(evt.data); time.push(recv.response[0]); cpu.push(parseFloat(recv.response[1])); if(time.length >=10){ time.shift(); cpu.shift(); console.log("时间:" + time + " --> CPU数据: " + cpu); display(time,cpu) } } </script> </body> </html>
后台部分我们主要代码不需要动,只需要修改handler_msg
处理流程即可,代码能够接受客户端的连接请求,并定时发送CPU数据给客户端。需要注意的是,代码中的sock.bind(("127.0.0.1", 10083))
绑定的是本地地址和指定的端口号,需要根据实际情况进行修改。另外,需要确保在服务器环境中安装了psutil
库,以便获取CPU使用率数据。
import socket,struct,hashlib,base64 import threading # 获取请求头部数据,并将请求头转换为字典 def get_headers(data): headers = {} data = str(data, encoding="utf-8") header, body = data.split("\r\n\r\n", 1) header_list = header.split("\r\n") for i in header_list: i_list = i.split(":", 1) if len(i_list) >= 2: headers[i_list[0]] = "".join(i_list[1::]).strip() else: i_list = i.split(" ", 1) if i_list and len(i_list) == 2: headers["method"] = i_list[0] headers["protocol"] = i_list[1] print("请求类型: {} 请求协议: {}".format(i_list[0],i_list[1])) return headers # 接收数据时的解码过程 def parse_payload(payload): payload_len = payload[1] & 127 if payload_len == 126: mask = payload[4:8] decoded = payload[8:] elif payload_len == 127: mask = payload[10:14] decoded = payload[14:] else: mask = payload[2:6] decoded = payload[6:] # 将所有数据全部收集起来,对所有字符串编码 bytes_list = bytearray() for i in range(len(decoded)): chunk = decoded[i] ^ mask[i % 4] bytes_list.append(chunk) body = str(bytes_list, encoding='utf-8') return body # 封装并发送数据到浏览器 def send_msg(conn, msg_bytes): # 接收的第一个字节都是x81不变 first_byte = b"\x81" length = len(msg_bytes) if length < 126: first_byte += struct.pack("B", length) elif length <= 0xFFFF: first_byte += struct.pack("!BH", 126, length) else: first_byte += struct.pack("!BQ", 127, length) msg = first_byte + msg_bytes conn.sendall(msg) return True # 从浏览器中接收数据 def recv_msg(conn): data_recv = conn.recv(8096) if data_recv[0:1] == b"\x81": data_parse = parse_payload(data_recv) return data_parse return False # 建立握手流程并创建 handler_msg 完成数据收发 def handler_accept(sock): while True: conn, addr = sock.accept() data = conn.recv(8096) headers = get_headers(data) # 对请求头中的sec-websocket-key进行加密 response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \ "Upgrade:websocket\r\n" \ "Connection: Upgrade\r\n" \ "Sec-WebSocket-Accept: %s\r\n" \ "WebSocket-Location: ws://%s\r\n\r\n" # 加盐操作,此处是H5规范定义好的 magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' if headers.get('Sec-WebSocket-Key'): value = headers['Sec-WebSocket-Key'] + magic_string # 对数据进行加解密 ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest()) response_str = response_tpl % (ac.decode('utf-8'), headers.get("Host")) # 相应握手包数据 conn.sendall(bytes(response_str, encoding="utf-8")) t = threading.Thread(target=handler_msg, args=(conn, )) t.start() # 主函数,用于实现数据交互 def handler_msg(conn): with conn as c: while True: try: times = time.strftime("%M:%S", time.localtime()) data = psutil.cpu_percent(interval=None, percpu=True) print("处理时间: {} --> 处理负载: {}".format(times, data)) send_msg(c, bytes(json.dumps({"response": [times, data]}), encoding="utf-8")) time.sleep(60) except Exception: exit(0) if __name__ == "__main__": sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("127.0.0.1", 10083)) sock.listen(5) t = threading.Thread(target=handler_accept(sock)) t.start()
实现动态交互
新建index.html写入如下代码,这段代码实现了一个Web页面,包含了一个可交互的终端,可以通过输入地址范围和执行命令来批量执行命令,并实时显示执行结果。需要注意的是,代码中的WebSocket连接地址和端口号需要根据实际情况进行修改,确保与后端WebSocket服务器的地址和端口号一致。
代码中的主要部分包括:
引入CSS和JavaScript库:页面引入了xterm.css
和xterm.js
来加载和渲染终端的外观样式和功能,同时还引入了jQuery库用于简化DOM操作。
创建终端实例:通过new Terminal()
创建了一个终端实例term
,设置了终端的行数、列数、换行符转换、光标闪烁等属性。
WebSocket连接:通过new WebSocket()
创建了一个WebSocket实例sock
,并指定了与后端的WebSocket服务器的地址和端口号。
终端打开和消息接收:在sock.onopen
回调函数中,调用term.open()
打开终端,并在控制台输出WebSocket连接成功的消息。在sock.onmessage
回调函数中,解析接收到的JSON数据,提取地址和状态信息,并将其显示在终端中。
执行命令:通过$('#send_message').click()
事件处理函数,获取输入框中的地址和命令信息,将其封装成JSON格式,并通过WebSocket发送给后端。
<html> <head> <meta charSet="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"> <link rel="stylesheet" href="https://cdn.lyshark.com/xterm/xterm.css" rel="external nofollow" /> <script src="https://cdn.lyshark.com/xterm/xterm.js"></script> <script src="https://cdn.lyshark.com/jquery/3.5.1/jquery.min.js" type="text/javascript"></script> </head> <body> <div id="terminal"></div> <input type="text" id="address" placeholder="主机范围 127.0.0.1-100" style="width:200px;height:40px"/> <input type="text" id="command" placeholder="执行命令 ls -lh " style="width:400px;height:40px"/> <input type="button" id="send_message" value="批量执行"> <!--实现格式化字符串--> <script type="text/javascript"> $.format = function(source, params) { if (arguments.length == 1) return function() { var args = $.makeArray(arguments); args.unshift(source); return $.format.apply(this, args); }; if (arguments.length > 2 && params.constructor != Array) { params = $.makeArray(arguments).slice(1); } if (params.constructor != Array) { params = [params]; } $.each(params, function(i, n) { source = source.replace(new RegExp("\\{" + i + "\\}", "g"), n); }); return source; }; </script> <!--打开终端,并开始执行命令--> <script type="text/javascript"> $(function(){ var window_width = $(window).width()-200; var window_height = $(window).height()-300; var term = new Terminal( { cols: Math.floor(window_width/9), rows: Math.floor(window_height/20), convertEol: true, cursorBlink:false, }); var sock = new WebSocket("ws://127.0.0.1:10083"); sock.onopen = function () { term.open(document.getElementById('terminal')); console.log('WebSocket Open'); }; sock.onmessage = function (recv) { var data = JSON.parse(recv.data); console.log(data['addr'] + ' -- ' + data['status']); var temp = "\x1B[1;3;35m 地址:[ {0} ] \x1B[0m --> \x1B[1;3;33m 状态:[ {1} ] \x1B[0m"; var string = $.format(temp, data['addr'],data['status']); term.writeln(string); }; $('#send_message').click(function () { var message ={"address":null,"command":null}; message['address'] = $("#address").val(); message['command'] = $("#command").val(); var send_data = JSON.stringify(message); window.sock.send(send_data); }); window.sock = sock; }); </script>
后端代码如下所示,其中核心代码handler_msg(conn),用于处理接收到的消息,解析地址范围,并针对每个地址执行命令。执行结果会通过WebSocket发送回客户端,并在后台打印出对应的主机和执行的命令。需要注意的是,代码中的函数recv_msg()
和send_msg()
没有提供具体的实现,你需要根据具体的需求自行实现这两个函数。
CalculationIP(Addr_Count)
函数用于解析地址范围,并生成对应的IP地址列表。参数Addr_Count
是一个地址范围字符串,如"127.0.0.1-100"
。函数首先根据"-"符号将地址范围字符串分割为起始IP和结束IP的两部分。然后根据起始IP生成IP地址的前三个部分,即IP地址的头部。接着根据起始IP和结束IP的范围,循环生成完整的IP地址,并将其添加到结果列表中。最后返回生成的IP地址列表。
handler_msg(conn)
函数是一个无限循环,用于处理消息并执行命令。在循环中,首先使用eval(recv_msg(c))
函数接收并解析JSON格式的消息数据。然后从消息中获取地址和命令信息。接下来调用CalculationIP()
函数生成地址列表。然后遍历地址列表,对每个地址执行命令,并将执行结果发送回客户端。执行过程中,会打印出对应的主机和执行的命令。在每次执行完命令后,程序会暂停1秒钟。如果在执行过程中发生异常,程序会退出。
import socket,struct,hashlib,base64 import threading,psutil # 获取请求头部数据,并将请求头转换为字典 def get_headers(data): headers = {} data = str(data, encoding="utf-8") header, body = data.split("\r\n\r\n", 1) header_list = header.split("\r\n") for i in header_list: i_list = i.split(":", 1) if len(i_list) >= 2: headers[i_list[0]] = "".join(i_list[1::]).strip() else: i_list = i.split(" ", 1) if i_list and len(i_list) == 2: headers["method"] = i_list[0] headers["protocol"] = i_list[1] print("请求类型: {} 请求协议: {}".format(i_list[0],i_list[1])) return headers # 接收数据时的解码过程 def parse_payload(payload): payload_len = payload[1] & 127 if payload_len == 126: mask = payload[4:8] decoded = payload[8:] elif payload_len == 127: mask = payload[10:14] decoded = payload[14:] else: mask = payload[2:6] decoded = payload[6:] # 将所有数据全部收集起来,对所有字符串编码 bytes_list = bytearray() for i in range(len(decoded)): chunk = decoded[i] ^ mask[i % 4] bytes_list.append(chunk) body = str(bytes_list, encoding='utf-8') return body # 封装并发送数据到浏览器 def send_msg(conn, msg_bytes): # 接收的第一个字节都是x81不变 first_byte = b"\x81" length = len(msg_bytes) if length < 126: first_byte += struct.pack("B", length) elif length <= 0xFFFF: first_byte += struct.pack("!BH", 126, length) else: first_byte += struct.pack("!BQ", 127, length) msg = first_byte + msg_bytes conn.sendall(msg) return True # 从浏览器中接收数据 def recv_msg(conn): data_recv = conn.recv(8096) if data_recv[0:1] == b"\x81": data_parse = parse_payload(data_recv) return data_parse return False # 建立握手流程并创建 handler_msg 完成数据收发 def handler_accept(sock): while True: conn, addr = sock.accept() data = conn.recv(8096) headers = get_headers(data) # 对请求头中的sec-websocket-key进行加密 response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \ "Upgrade:websocket\r\n" \ "Connection: Upgrade\r\n" \ "Sec-WebSocket-Accept: %s\r\n" \ "WebSocket-Location: ws://%s\r\n\r\n" # 加盐操作,此处是H5规范定义好的 magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' if headers.get('Sec-WebSocket-Key'): value = headers['Sec-WebSocket-Key'] + magic_string # 对数据进行加解密 ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest()) response_str = response_tpl % (ac.decode('utf-8'), headers.get("Host")) # 相应握手包数据 conn.sendall(bytes(response_str, encoding="utf-8")) t = threading.Thread(target=handler_msg, args=(conn, )) t.start() def CalculationIP(Addr_Count): ret = [] try: IP_Start = str(Addr_Count.split("-")[0]).split(".") IP_Heads = str(IP_Start[0] + "." + IP_Start[1] + "." + IP_Start[2] +".") IP_Start_Range = int(Addr_Count.split(".")[3].split("-")[0]) IP_End_Range = int(Addr_Count.split("-")[1]) for item in range(IP_Start_Range,IP_End_Range+1): ret.append(IP_Heads+str(item)) return ret except Exception: return 0 def handler_msg(conn): with conn as c: while True: try: ref_json = eval(recv_msg(c)) address = ref_json.get("address") command = ref_json.get("command") address_list = CalculationIP(address) for ip in address_list: response = {'addr': ip, 'status': 'success'} print("对主机: {} --> 执行: {}".format(ip,command)) send_msg(c, bytes(json.dumps(response) , encoding="utf-8")) time.sleep(1) except Exception: exit(0) if __name__ == "__main__": sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("127.0.0.1", 10083)) sock.listen(5) t = threading.Thread(target=handler_accept(sock)) t.start()
到此这篇关于Python 实现 WebSocket 通信的文章就介绍到这了,更多相关Python WebSocket 通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!