python json-rpc 规范源码阅读
json-rpc 源码阅读
JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议。JSON-RPC应用很广泛,比如以太坊的API。JSON-RPC的python实现较多,我选择了Exploding Labs 提供的python版本。主要是其它库都比较古老,而e-labs的实现采用最新版本python,支持类型系统,还有一些函数式编程的范式,代码也很简洁,值得学习。
e-labs的JSON-RPC分成客户端和服务端两个库,分别是jsonrpcclient和jsonrpcserver, 代码版本如下表:
名称 | 版本 |
---|---|
jsonrpcclient | 4.0.2 |
jsonrpcserver | 5.0.9 |
准备好代码后,我们可以开始json-rpc的源码阅读,本文包括下面几个部分:
- JSON-RPC规范
- jsonrpcclient的实现
- jsonrpcserver的实现
- 小结
- 小技巧
JSON-RPC规范
JSON-RPC规范,我这里借用jsonrpcserver中的验证规则文件简单介绍一下,文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | # request-schema.json { "$schema" : "http://json-schema.org/draft-04/schema#" , "description" : "A JSON RPC 2.0 request" , "oneOf" : [ { "description" : "An individual request" , "$ref" : "#/definitions/request" }, { "description" : "An array of requests" , "type" : "array" , "items" : { "$ref" : "#/definitions/request" }, "minItems" : 1 } ], "definitions" : { "request" : { "type" : "object" , "required" : [ "jsonrpc" , "method" ], "properties" : { "jsonrpc" : { "enum" : [ "2.0" ] }, "method" : { "type" : "string" }, "id" : { "type" : [ "string" , "number" , "null" ], "note" : [ "While allowed, null should be avoided: http://www.jsonrpc.org/specification#id1" , "While allowed, a number with a fractional part should be avoided: http://www.jsonrpc.org/specification#id2" ] }, "params" : { "type" : [ "array" , "object" ] } }, "additionalProperties" : false } } } |
文件描述了JSON-RPC的规则,如下:
- json-rpc请求可以是单个的request对象,也是是批量的request对象数组
- 每个request对象需要符合:
- 必填字段jsonrpc,值枚举类型。目前2.0,其实就是版本号。(之前有1.0版本)
- 必填字段method, 字符串类型。远程函数的名称。
- id字段,支持字符串,数字或者空。为空表示通知无需回应(result)。id确保响应可以一一对应到请求上。
- params字段,支持数组或者字典。
JSON-RPC响应部分的规则是:
- jsonrpc字段,值为2.0
- result字段,值为调用结果
- error字段,值为异常信息,包括code,message和data三个字段,规范定义了详细的错误清单。
- id同请求的id
- result和error二选一
强烈建议大家阅读参考链接中的规范原文,介绍的非常清晰,中文翻译也很到位,有助于对JSON-RPC规范完全理解。
jsonrpcclient的实现
模块文件 | 功能描述 |
---|---|
id_generators.py | id生成器 |
requests.py | 请求信息封装 |
response.py | 响应信息封装 |
sentinels.py | 定义NOID,用于通知类请求 |
utils.py | 一些工具函数 |
examples | 一些示例 |
从示例可以知道JSON-RPC,可以使用不同的底层协议比如http,websocket和tcp(zeromq实现)等。我们看最简单的基于http实现的实例:
1 2 3 4 5 6 7 8 9 | from jsonrpcclient import request, parse, Ok import logging import requests response = requests.post( "http://localhost:5000/" , json = request( "ping" )) parsed = parse(response.json()) if isinstance (parsed, Ok): print (parsed.result) else : logging.error(parsed.message) |
这段api展示了:
- jsonrpcclient只是封装请求request和响应Ok,数据请求的发送由不同协议提供,这里使用requests,另外还有aiohttp的实现等。
- resquest函数封装请求,parse解析响应
- 正常的结果展示result信息,错误的结果展示message信息
request代码很简单, 封装请求成符合JSON-RPC规范的字符串:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | # requests.py def request_pure( id_generator: Iterator[ Any ], method: str , params: Union[ Dict [ str , Any ], Tuple [ Any , ...]], id : Any , ) - > Dict [ str , Any ]: return { "jsonrpc" : "2.0" , "method" : method, * * ( { "params" : list (params) if isinstance (params, tuple ) else params} if params else {} ), "id" : id if id is not NOID else next (id_generator), } def request_impure( id_generator: Iterator[ Any ], method: str , params: Union[ Dict [ str , Any ], Tuple [ Any , ...], None ] = None , id : Any = NOID, ) - > Dict [ str , Any ]: return request_pure( id_generator or id_generators.decimal(), method, params or (), id ) request_natural = partial(request_impure, id_generators.decimal()) ... request = request_natural |
所以示例中的请求,可以等价下面的curl命令:
1 | $ curl -X POST http: //localhost :5001 -d '{"jsonrpc": "2.0", "method": "ping", "params": {}, "id": 1}' |
response处理也很简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | # response.py class Ok(NamedTuple): result: Any id : Any def __repr__( self ) - > str : return f "Ok(result={self.result!r}, id={self.id!r})" class Error(NamedTuple): code: int message: str data: Any id : Any def __repr__( self ) - > str : return f "Error(code={self.code!r}, message={self.message!r}, data={self.data!r}, id={self.id!r})" Response = Union[Ok, Error] |
定义Response类型,是Ok或者Error。Ok和Error是两个可命名元祖。
parse就是将结果json字典解析成对应的Response:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | def to_result(response: Dict [ str , Any ]) - > Response: return ( Ok(response[ "result" ], response[ "id" ]) if "result" in response else Error( response[ "error" ][ "code" ], response[ "error" ][ "message" ], response[ "error" ].get( "data" ), response[ "id" ], ) ) def parse(response: Deserialized) - > Union[Response, Iterable[Response]]: return ( map (to_result, response) if isinstance (response, list ) else to_result(response) ) |
也可以直接使用parse_json函数,从json字符串生成结果:
这里的map,componse等都是函数式编程。在server中函数式编程使用的更多,可见作者非常喜欢函数式编程的思想
jsonrpcserver的实现
jsonrpcclient实现非常简单,jsonrpcserver的实现会略微复杂点,但是还是可以很好的理解的,我们一起继续。jsonrpcserver的主要模块如下:
模块 | 描述 |
---|---|
main.py/async_main.py | main文件,分别是同步和异步版本 |
dispatcher.py/async_dispatcher.py | rpc服务的分配器实现 |
methods.py | rpc函数的装饰器 |
request.py | 请求处理 |
response.py | 响应处理 |
result.py | 结果处理 |
examplse | 一些示例 |
通用,我们先从示例入手,看看api的使用。下面是flask版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | # flask_server.py from flask import Flask, Response, request from jsonrpcserver import method, Result, Success, dispatch app = Flask(__name__) @method def ping() - > Result: return Success( "pong" ) @app .route( "/" , methods = [ "POST" ]) def index(): return Response( dispatch(request.get_data().decode()), content_type = "application/json" ) if __name__ = = "__main__" : app.run() |
从示例我们可以知道,rpc服务其实就2大步骤:
- 使用method装饰ping函数,使它支持rpc调用,ping函数返回的是一个特点的Result数据结构
- 所有rpc调用的http-url都是根目录,服务使用dispatch调度rpc请求
先看第一步rpc装饰器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | # methods.py Method = Callable [..., Result] Methods = Dict [ str , Method] global_methods = dict () def method( f: Optional[Method] = None , name: Optional[ str ] = None ) - > Callable [..., Any ]: """A decorator to add a function into jsonrpcserver's internal global_methods dict. The global_methods dict will be used by default unless a methods argument is passed to `dispatch`. Functions can be renamed by passing a name argument: @method(name=bar) def foo(): ... """ def decorator(func: Method) - > Method: nonlocal name global_methods[name or func.__name__] = func return func return decorator(f) if callable (f) else cast(Method, decorator) |
- 将所有的rpc函数都封装到global_methods字典中
- 函数需要返回Result类型
第2步中,main模块提供了dispatch的api,主要就是下面的函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | # main.py def dispatch_to_response( request: str , methods: Optional[Methods] = None , * , context: Any = NOCONTEXT, deserializer: Callable [[ str ], Deserialized] = json.loads, validator: Callable [[Deserialized], Deserialized] = default_validator, post_process: Callable [[Response], Any ] = identity, ) - > Union[Response, List [Response], None ]: """Takes a JSON-RPC request string and dispatches it to method(s), giving Response namedtuple(s) or None. This is a public wrapper around dispatch_to_response_pure, adding globals and default values to be nicer for end users. Args: request: The JSON-RPC request string. methods: Dictionary of methods that can be called - mapping of function names to functions. If not passed, uses the internal global_methods dict which is populated with the @method decorator. context: If given, will be passed as the first argument to methods. deserializer: Function that deserializes the request string. validator: Function that validates the JSON-RPC request. The function should raise an exception if the request is invalid. To disable validation, pass lambda _: None. post_process: Function that will be applied to Responses. Returns: A Response, list of Responses or None. Examples: >>> dispatch('{"jsonrpc": "2.0", "method": "ping", "id": 1}') '{"jsonrpc": "2.0", "result": "pong", "id": 1}' """ return dispatch_to_response_pure( deserializer = deserializer, validator = validator, post_process = post_process, context = context, methods = global_methods if methods is None else methods, request = request, ) |
- request 请求的函数名称
- methods 可供调用的函数集合,默认就是之前rpc装饰器中存储的global_methods
- deserializer 请求的反序列化函数,validator请求验证器
- post_process响应处理函数
post_process主要就是根据结果类型,分别取不同的字段并序列化:
1 2 3 4 5 6 | def to_serializable_one(response: ResponseType) - > Union[Deserialized, None ]: return ( serialize_error(response._error) if isinstance (response, Left) else serialize_success(response._value) ) |
dispatch的实现,主要是下面2个函数dispatch_request和call,前者查找rpc函数,后者执行rpc函数。dispatch_request内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | def dispatch_request( methods: Methods, context: Any , request: Request ) - > Tuple [Request, Result]: """Get the method, validates the arguments and calls the method. Returns: A tuple containing the Result of the method, along with the original Request. We need the ids from the original request to remove notifications before responding, and create a Response. """ return ( request, get_method(methods, request.method) .bind(partial(validate_args, request, context)) .bind(partial(call, request, context)), ) |
这里使用了oslash这个函数式编程库,我们可以简单的使用unix的管道思想去理解:
- 使用get_method查找rpc响应函数
- 使用validate_args验证rpc请求
- 使用call执行rpc调用
- 3个步骤依次执行,前者的返回值会作为后缀的参数
重中之重是call函数,原理非常简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | def call(request: Request, context: Any , method: Method) - > Result: """Call the method. Handles any exceptions raised in the method, being sure to return an Error response. Returns: A Result. """ try : result = method( * extract_args(request, context), * * extract_kwargs(request)) # validate_result raises AssertionError if the return value is not a valid # Result, which should respond with Internal Error because its a problem in the # method. validate_result(result) # Raising JsonRpcError inside the method is an alternative way of returning an error # response. except JsonRpcError as exc: return Left(ErrorResult(code = exc.code, message = exc.message, data = exc.data)) # Any other uncaught exception inside method - internal error. except Exception as exc: logger.exception(exc) return Left(InternalErrorResult( str (exc))) return result |
- 使用args和kwargs动态执行rpc函数,并将结果进行返回
- 捕获异常,返回标准错误
这里的Left是函数式编程中的概念,我们可以从response的实现,简单了解一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 | # response.py class SuccessResult(NamedTuple): result: Any = None class ErrorResult(NamedTuple): code: int message: str data: Any = NODATA # The spec says this value may be omitted # Union of the two valid result types Result = Either[ErrorResult, SuccessResult] def Success( * args: Any , * * kwargs: Any ) - > Either[ErrorResult, SuccessResult]: return Right(SuccessResult( * args, * * kwargs)) def Error( * args: Any , * * kwargs: Any ) - > Either[ErrorResult, SuccessResult]: return Left(ErrorResult( * args, * * kwargs)) |
SuccessResult和ErrorResult是python的两个标准对象;Result是oslash中定义的联合对象,在ErrorResult, SuccessResult中二选一,有些类似rust中的Option;Right封装了正确的结果,Left封装了错误的结果。
这一部分需要一些函数式编程的基础,如果不太理解,推荐阅读参考链接。
小结
我们一起学习了JSON-RPC规范,并且了解了Exploding Labs如何使用 现代python 实现该规范,也接触了一些函数式编程的方式。
小技巧
业务有时候需要自己实现一个简单的自增id,我们也许会用全局变量来做:
全局变量会形成一些污染,利用闭包的特性,我们可以优化成这样:
1 2 3 4 5 6 7 8 9 | def gen2(): start = 0 def incr(): start + = 1 return count return incr gen = gen2() # 调用 id = gen() |
json-rpc里提供了使用yeild关键字实现的版本:
1 2 3 4 5 6 7 8 9 10 | def hexadecimal(start: int = 1 ) - > Iterator[ str ]: """ Incremental hexadecimal numbers. e.g. 1, 2, 3, .. 9, a, b, etc. Args: start: The first value to start with. """ while True : yield "%x" % start start + = 1 |
参考链接
以上就是python json-rpc 规范源码阅读的详细内容,更多关于python json-rpc 规范的资料请关注脚本之家其它相关文章!

微信公众号搜索 “ 脚本之家 ” ,选择关注
程序猿的那些事、送书等活动等着你
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 reterry123@163.com 进行投诉反馈,一经查实,立即处理!
相关文章
为Python的Tornado框架配置使用Jinja2模板引擎的方法
Jinja2是人气Web框架Flask中的内置模板引擎,而且与Django的模板引擎比较类似,这里我们就来看一下为Python的Tornado框架配置使用Jinja2模板引擎的方法2016-06-06
最新评论