python中使用websocket方法实例详解
作者:别出BUG求求了
WebSocket是一种网络通信协议,它在单个TCP连接上提供全双工的通信信道。在本篇文章中,我们将探讨如何在Python中使用WebSocket实现实时通信。
websockets是Python中最常用的网络库之一,也是websocket协议的Python实现。它不仅作为基础组件在众多项目中发挥着重要作用,其源码也值得广大“Python玩家”研究。
官网:https://github.com/python-websockets/websockets
1. 什么是WebSocket?
WebSocket协议是在2008年由Web应用程序设计师和开发人员创建的,目的是为了在Web浏览器和服务器之间提供更高效、更低延迟的双向通信。它允许客户端和服务器在任何时候发送消息,无需重新建立TCP连接。WebSocket可以在Web浏览器和服务器之间传输文本和二进制数据,使得构建实时Web应用程序变得更加简单。
2. 在Python中使用WebSocket
Python中有多个库可以帮助我们使用WebSocket,如:websockets、aiohttp等。在本文中,我们将使用websockets库来演示WebSocket编程。
要安装websockets库,你可以使用pip:
pip install websockets
3. 创建WebSocket服务器
使用websockets库,我们可以轻松地创建一个WebSocket服务器。以下是一个简单的示例:
import asyncio import websockets async def echo(websocket, path): async for message in websocket: print(f"Received message: {message}") await websocket.send(f"Echo: {message}") start_server = websockets.serve(echo, "localhost", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
在这个示例中,我们定义了一个名为echo的协程函数,它接收两个参数:websocket和path。该函数使用async for循环读取客户端发送的消息,并将消息发送回客户端。
然后,我们使用websockets.serve()函数创建一个WebSocket服务器,监听本地主机的8765端口。最后,我们使用asyncio的事件循环启动服务器。
4. 创建WebSocket客户端
要创建一个WebSocket客户端,我们同样可以使用websockets库。以下是一个简单的客户端示例:
import asyncio import websockets async def main(): async with websockets.connect("ws://localhost:8765") as websocket: message = "Hello, server!" await websocket.send(message) print(f"Sent: {message}") response = await websocket.recv() print(f"Received: {response}") asyncio.run(main())
在这个示例中,我们使用websockets.connect()函数建立与WebSocket服务器的连接。然后,我们使用send()方法向服务器发送消息,并使用recv()方法接收服务器的响应。
5. 总结
WebSocket协议为Web浏览器和服务器之间提供了实时双向通信的能力,使得构建实时Web应用程序变得更加容易。在Python中,我们可以使用websockets库轻松地实现WebSocket编程。
6. 通过websockets这个项目,从大型开源项目中学习asyncio库。
一、asyncio.Transport
在官方文档中,Transport被描述成对socket的抽象,它控制着如何传输数据。除了websockets,uvicorn、daphne等ASGI实现都会用到Transport。
Transport继承于ReadTransport和WriteTransport,两者都继承于BaseTransport。顾名思义,Transport兼备读和写的功能,可以类比为读写socket对象。
Transport对象提供以下常用函数——
is_reading:判断该Transport是否在读。
set_write_buffer_limits:设置写入Transport的高和低水位。考虑到网络状况,有时不希望写入过多的数据。
write、write_eof、write_line:为当前Transport写入数据,分别表示写入二进制数据、eof和二进制行数据。其中eof写入后不会关闭Transport,但会flush数据。
abort:立刻关闭Transport,不接受新的数据。留在缓冲的数据也会丢失,后续调用Protocol的connection_lost函数。
在websockets中,Transport使用场景不多,一般都是通过Protocol对象的回调参数使用的。在websocket的初始化过程中,会设置Transport的最高水位。同样,在这种场景下,该对象也是作为回调参数使用的。
二、asyncio.Protocol
如果Transport是对socket的抽象,那么Protocol就是对协议的抽象。它提供了如何使用Transport的方式。
用户使用的Protocol直接继承自BaseProtocol,并提供了六个Unimplemented函数需要用户去实现——
connection_made:当连接建立时会执行该函数,该函数包含一个Transport类型的参数。
connection_lost:当连接丢失或者关闭时会执行该函数,该函数包含一个Exception类型的参数。
pause_writing:当Transport对象写入的数据高于之前设置的高水位时被调用,一般会暂停数据的写入。
resume_writing:当Transport对象写入的数据低于之前设置的低水位时被调用,一般用于恢复数据写入。
data_received:当有数据被接受时回调,该函数包含一个二进制对象data,用来表示接受的数据。
eof_received:当被Transport对象被调用write_eof时被调用。
在websockets中,server端的connection_made实现截图如图所示。在该函数中,websockets将用户实现的handler封装成task对象,并和websocket的server绑定。
而在client端中实现如第一节截图所示,只是在reader中注册该Transport对象。
websockets的connection_lost函数实现方式如下。主要操作即更新状态、关闭pings、更新对应的waiter状态,以及维护reader对象。
在其他函数的实现中,websockets也主要用到了reader对象完成数据流的暂停和恢复,以及数据的写入。
从上面代码实现可以看出,websockets通过reader代理完成数据流的操作。这个reader是一个asyncio.StreamReader对象。这个对象具体如何使用将在下一篇介绍。
附录:进阶版本:
python使用websockets库
serve:在server端使用,等待客户端的连接。如果连接成功,返回一个websocket。
connect: 在client端使用,用于建立连接。
send:发送数据
recv:接收数据
close:关闭连接
服务端
#!/usr/bin/python3 # 主要功能:创建1个基本的websocket server, 符合asyncio 开发要求 import asyncio import websockets from datetime import datetime async def handler(websocket): data = await websocket.recv() reply = f"Data received as \"{data}\". time: {datetime.now()}" print(reply) await websocket.send(reply) print("Send reply") async def main(): async with websockets.serve(handler, "localhost", 9999): await asyncio.Future() # run forever if __name__ == "__main__": asyncio.run(main())
客户端
import asyncio import websockets import time async def ws_client(url): for i in range(1, 40): async with websockets.connect(url) as websocket: await websocket.send("Hello, I am PyPy.") response = await websocket.recv() print(response) time.sleep(1) asyncio.run(ws_client('ws://localhost:9999'))
服务端
import asyncio import websockets IP_ADDR = "127.0.0.1" IP_PORT = "9090" # 握手,通过接收Hi,发送"success"来进行双方的握手。 async def serverHands(websocket): while True: recv_text = await websocket.recv() print("recv_text=" + recv_text) if recv_text == "Hi": print("connected success") await websocket.send("success") return True else: await websocket.send("connected fail") # 接收从客户端发来的消息并处理,再返给客户端success async def serverRecv(websocket): while True: recv_text = await websocket.recv() print("recv:", recv_text) await websocket.send("success,get mess:"+ recv_text) # 握手并且接收数据 async def serverRun(websocket, path): print(path) await serverHands(websocket) await serverRecv(websocket) # main function if __name__ == '__main__': print("======server======") server = websockets.serve(serverRun, IP_ADDR, IP_PORT) asyncio.get_event_loop().run_until_complete(server) asyncio.get_event_loop().run_forever()
客户端
import asyncio import websockets IP_ADDR = "127.0.0.1" IP_PORT = "9090" async def clientHands(websocket): while True: # 通过发送hello握手 await websocket.send("Hi") response_str = await websocket.recv() # 接收"success"来进行双方的握手 if "success" in response_str: print("握手成功") return True # 向服务器端发送消息 async def clientSend(websocket): while True: input_text = input("input text: ") if input_text == "exit": print(f'"exit", bye!') await websocket.close(reason="exit") return False await websocket.send(input_text) recv_text = await websocket.recv() print(f"{recv_text}") # 进行websocket连接 async def clientRun(): ipaddress = IP_ADDR + ":" + IP_PORT async with websockets.connect("ws://" + ipaddress) as websocket: await clientHands(websocket) await clientSend(websocket) # main function if __name__ == '__main__': print("======client======") asyncio.get_event_loop().run_until_complete(clientRun())
服务端
# -*- coding:utf8 -*- import json import socket import asyncio import logging import websockets import multiprocessing IP = '127.0.0.1' PORT_CHAT = 9090 USERS ={} #提供聊天的后台 async def ServerWs(websocket,path): logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', filename="chat.log", level=logging.INFO) # 握手 await websocket.send(json.dumps({"type": "handshake"})) async for message in websocket: data = json.loads(message) message = '' # 用户发信息 if data["type"] == 'send': name = '404' for k, v in USERS.items(): if v == websocket: name = k data["from"] = name if len(USERS) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps( {"type": "user", "content": data["content"], "from": name}) # 用户注册 elif data["type"] == 'register': try: USERS[data["uuid"]] = websocket if len(USERS) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps( {"type": "login", "content": data["content"], "user_list": list(USERS.keys())}) except Exception as exp: print(exp) # 用户注销 elif data["type"] == 'unregister': del USERS[data["uuid"]] if len(USERS) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps( {"type": "logout", "content": data["content"], "user_list": list(USERS.keys())}) #打印日志 logging.info(data) # 群发 await asyncio.wait([user.send(message) for user in USERS.values()]) def server_run(): print("server") start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() if __name__ == "__main__": from multiprocessing import Process multiprocessing.freeze_support() server = Process(target=server_run, daemon=False) server.start()
服务端
import asyncio import websockets import time import json import threading # 功能模块 class OutputHandler(): async def run(self,message,send_ms,websocket): # 用户发信息 await send_ms(message, websocket) # 单发消息 # await send_ms(message, websocket) # 群发消息 #await s('hi起来') # 存储所有的客户端 Clients = {} # 服务端 class WS_Server(): def __init__(self): self.ip = "127.0.0.1" self.port = 9090 # 回调函数(发消息给客户端) async def callback_send(self, msg, websocket=None): await self.sendMsg(msg, websocket) # 发送消息 async def sendMsg(self, msg, websocket): print('sendMsg:', msg) # websocket不为空,单发,为空,群发消息 if websocket != None: await websocket.send(msg) else: # 群发消息 await self.broadcastMsg(msg) # 避免被卡线程 await asyncio.sleep(0.2) # 群发消息 async def broadcastMsg(self, msg): for user in Clients: await user.send(msg) # 针对不同的信息进行请求,可以考虑json文本 async def runCaseX(self,jsonMsg,websocket): print('runCase') op = OutputHandler() # 参数:消息、方法、socket await op.run(jsonMsg,self.callback_send,websocket) # 连接一个客户端,起一个循环监听 async def echo(self,websocket, path): # 添加到客户端列表 # Clients.append(websocket) # 握手 await websocket.send(json.dumps({"type": "handshake"})) # 循环监听 while True: # 接受信息 try: # 接受文本 recv_text = await websocket.recv() message = "Get message: {}".format(recv_text) # 返回客户端信息 await websocket.send(message) # 转json data = json.loads(recv_text) # 用户发信息 if data["type"] == 'send': name = '404' for k, v in Clients.items(): if v == websocket: name = k data["from"] = name if len(Clients) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps({"type": "send", "content": data["content"], "from": name}) await self.runCaseX(jsonMsg=message, websocket=websocket) # 用户注册 elif data["type"] == 'register': try: Clients[data["uuid"]] = websocket if len(Clients) != 0: # asyncio.wait doesn't accept an empty list message = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())}) await self.runCaseX(jsonMsg=message, websocket=websocket) except Exception as exp: print(exp) # 用户注销 elif data["type"] == 'unregister': del Clients[data["uuid"]] # 对message进行解析,跳进不同功能区 # await self.runCaseX(jsonMsg=data,websocket=websocket) # 链接断开 except websockets.ConnectionClosed: print("ConnectionClosed...", path) # del Clients break # 无效状态 except websockets.InvalidState: print("InvalidState...") # del Clients break # 报错 except Exception as e: print("ws连接报错",e) # del Clients break # 启动服务器 async def runServer(self): async with websockets.serve(self.echo, self.ip, self.port): await asyncio.Future() # run forever # 多协程模式,防止阻塞主线程无法做其他事情 def WebSocketServer(self): asyncio.run(self.runServer()) # 多线程启动 def startServer(self): # 多线程启动,否则会堵塞 thread = threading.Thread(target=self.WebSocketServer) thread.start() # thread.join() if __name__=='__main__': print("server") s = WS_Server() s.startServer()
到此这篇关于python的websocket方法教程的文章就介绍到这了,更多相关python的websocket方法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!