Python gRPC流式通信协议详细讲解
作者:雕弓
ProtoBuf 协议
gRPC使用 protocol buffer 协议做为接口描述语言(IDL) ,来定义接口及传递的消息数据类型。
gRPC的message 与 service 均须在protobuf 文件中定义好,才能进行编译。 该文件须按protobuf 语法编写, 也比较简单,当前只须学习gRPC用到的部分。
如下例:
service HelloService { rpc SayHello (HelloRequest) returns (HelloResponse); } message HelloRequest { string greeting = 1; } message HelloResponse { string reply = 1; }
说明:
- syntax = “proto3” protobuf的版本号
- package tutorial 主要是java用,python是用文件名做为module 名,可以不需要定义
- Service : 就是定义1个 gRPC service, 在service代码块内,定义rpc 方法,指定request, response类型。 gRPC支持4种rpc方法
service interface_name { rpc api_name( request ) returns ( response ); }
Message: 相当于接口函数的参数类型与返回值类型 ,需要分开定义
详细 protobuf 使用教程,请参考菜鸟教程tutorialspoint 的教程 https://www.tutorialspoint.com/protobuf/index.htm
gRPC 4种通信模式介绍
1. 单向RPC
gRPC的术语为unary RPC,这种方式与函数调用类似, client 发送1条请求,server回1条响应。
rpc SayHello(HelloRequest) returns (HelloResponse);
2. 服务器流式处理 RPC
客户端向服务器发送1条请求,服务器以回应多条响应消息,客户机从返回的流中读取数据,直至没有更多消息。 这时要在响应类型前加1个 stream关键字修饰。
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
3. 客户端流式处理 RPC
由客户端写入一系列消息并将其发送到服务。 客户端完成消息写入后,它将等待服务器读取消息并返回其响应. 这种模式,要在request的类型前加 stream 修饰。
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
4. 双向流式处理 RPC
其中双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照它们喜欢的任何顺序进行读取和写入:例如,服务器可以等待接收所有客户端消息,然后再写入响应,或者它可以交替读取消息然后写入消息,或者读取和写入的某种其他组合。将保留每个流中消息的顺序。此模式下, request 与 response类型均需要用stream修饰。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
说明
- 流式处理中,服务器A向客户机B发送数据,必须是在protobuf 中 rpc方法中Response message包含的数据类型。而不是任意发送数据,程序会报错。不限制的是响应中可以回1条或多条消息。
- 如果需要A向B发送请求,则需要定义新的protobuf 文件,并编写新的server, client python代码,这时,两台机器的角色是倒过来,B为server, A为client。仍然符合gRPC的原则。
- 如果一定需要在1条连接中,双方互发请求,socket 模块的低阶API接口函数编程可以满足要求,但必须注意,管理socket双向通信必须小心翼翼,否则会造成混乱,
流程处理实现过程
1. 用protobuf 定义接口
下面以实例说明: users.proto ,
syntax = "proto3"; package users; message User { string username = 1; uint32 user_id = 2; } message CreateUserRequest { string username = 1; string password = 2; string email = 3; } message CreateUserResult { User user = 1; } message GetUsersRequest { repeated User user = 1; } service Users { rpc CreateUser (users.CreateUserRequest) returns (users.CreateUserResult); rpc GetUsers (users.GetUsersRequest) returns (stream users.GetUsersResult); } message GetUsersResult { User user = 1; }
2. 根据.protobuf文件生成客户方与服务方代码
首先要安装 grpcio-tools package:
pip install grpcio-tools
进入proto文件所在目录,执行如下命令
python -m grpc_tools.protoc
\ --proto_path=.
\ --python_out=.
\ --grpc_python_out=.
\ proto文件名
参数说明
- proto_path=proto文件路径
- python_out=编译生成的文件的路径
- grpc_python_out=编译生成的接口文件路径
- ./route_guide.proto 是要编译的协议文件
本例 :
python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. users.proto
生成的文件有两个: Users_pb2.py 与 Users_pb2_grpc.py,
3. 服务器端代码
from concurrent import futures import time import grpc import users_pb2_grpc as users_service import users_pb2 as users_messages _ONE_DAY_IN_SECONDS = 60 * 60 * 24 class UsersService(users_service.UsersServicer): def CreateUser(self, request, context): metadata = dict(context.invocation_metadata()) print(metadata) user = users_messages.User(username=request.username, user_id=1) return users_messages.CreateUserResult(user=user) def GetUsers(self, request, context): for user in request.user: user = users_messages.User( username=user.username, user_id=user.user_id ) yield users_messages.GetUsersResult(user=user) def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) users_service.add_UsersServicer_to_server(UsersService(), server) server.add_insecure_port('0.0.0.0:50051') server.start() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
4. 客户端侧代码
import sys import grpc import users_pb2_grpc as users_service import users_pb2 as users_messages def run(): channel = grpc.insecure_channel('localhost:50051') try: grpc.channel_ready_future(channel).result(timeout=10) except grpc.FutureTimeoutError: sys.exit('Error connecting to server') else: stub = users_service.UsersStub(channel) metadata = [('ip', '127.0.0.1')] response = stub.CreateUser( users_messages.CreateUserRequest(username='tom'), metadata=metadata, ) if response: print("User created:", response.user.username) request = users_messages.GetUsersRequest( user=[users_messages.User(username="alexa", user_id=1), users_messages.User(username="christie", user_id=1)] ) response = stub.GetUsers(request) for resp in response: print(resp) if __name__ == '__main__': run()
5. 测试代码
打开两个终端窗口,分别运行grpc_server.py, grpc_client.py
可以看到client.py 窗口显示
(enva) D:\workplace\python\enva\test1>py grpc_client.py User created: tom user { username: "alexa" user_id: 1 } user { username: "christie" user_id: 1 }
服务器窗口同时显示
(enva) D:\workplace\python\enva\test1>py grpc_server.py
{'user-agent': 'grpc-python/1.50.0 grpc-c/28.0.0 (windows; chttp2)', 'ip': '127.0.0.1'}
学习小记
流式处理编程,其实比较简单,只是流式处理一方要构建多条mesage,接口方法会自动逐条发送,接收侧也只须遍历读取即可。流式处理用来发送大文件,如图片,视频之类,比REST有明显优势,而且有规范接口,也便于团队合作。
到此这篇关于Python gRPC流式通信协议详细讲解的文章就介绍到这了,更多相关Python gRPC流式通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!