python Tornado异步使用场景源码解析
作者:NodeKey
前言
tornado在我工作前说实话我还没有听说过,今年回来后接触的非常多。
关于想要学习异步的内容,起因是如下场景:
tornado拿到请求后,对他的处理时间非常的长,一般大概有10s,晨旭让我把他写成异步的,如果只是对请求异步的是相当好写的。就是十分传统地在收到请求后立即返回,然后进行处理,处理了之后再返回给给定的callback_url
。
但我突然想到,能否对这整个处理进行异步,将其放到后台运行,然后继续接收请求不至于在请求上堵塞。
最后是没实现出来……坤爷让我去花时间了解一下tornado所著名的异步。于是我才发现,我这想法在tornado的异步中是不可行的。(而且错误的地方还蛮多的……
异步使用方式
from tornado import gen @gen.coroutine def fetch_coroutine(url): http_client = AsyncHTTPClient() response = yield http_client.fetch(url) raise gen.Return(response.body)
这是最常用也是tornado推荐的方式。不过这样写的前提是py2,py3有些新特性,写的方式就不太一样了。
正如代码中展示的,你需要给这个函数加个gen.coroutine
装饰器,然后在你想要异步处理的函数加个yield
。
说实话,这已经有点跟我想的不太一样了,callback_function
呢?没了?emmmmm好吧,这样写的确是没有的。
关于协程
简单介绍以下协程,协程是线程通过自己来调度任务,保存恢复上下文,而这样做的好处就是能减少切换线程或者进程的损耗,如果线程数量极多,但是做的事情不多,简单说运行类型是io密集型的话,可以考虑使用协程。
因为如果是cpu密集型的话,毕竟本质上还是一个线程,所以会堵塞到其他的协程,这与我们的高效目的是相违背的。
另外正式开始讲之前,我们首先需要明确的是一个函数如果带有yeild
,那么这个函数实质上就是一个生成器。我们直接对其调用它返回的也是一个生成器。所有会有这样一句话:
所有生成器都是异步的
实际上也确实如此,生成器立即返回,我们想要执行器内容的话,可以自行调用next
或者send
(非第一次)
于是python的yield对于协程有了天然的支持
源码解析
注:本文章分析的源码是 2018.12.24 的 stable 版本
coroutine
协程,也是 tornado.gen
中的一个装饰器
这个装饰器其内容只有一个 return。而在 tornado.gen
中其实还有个装饰器engine
,它的replace_callback
的 默认值为False
return _make_coroutine_wrapper(func, replace_callback=True)
以下是_make_coroutine_wrapper
的源码,源码十分紧凑,考虑的很全面,先说下大多数情况
def _make_coroutine_wrapper(func, replace_callback): wrapped = func if hasattr(types, 'coroutine'): func = types.coroutine(func) @functools.wraps(wrapped) def wrapper(*args, **kwargs): future = _create_future() if replace_callback and 'callback' in kwargs: warnings.warn("callback arguments are deprecated, use the returned Future instead", DeprecationWarning, stacklevel=2) callback = kwargs.pop('callback') IOLoop.current().add_future( future, lambda future: callback(future.result())) try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: future_set_exc_info(future, sys.exc_info()) try: return future finally: # Avoid circular references future = None else: if isinstance(result, GeneratorType): try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) if stack_context._state.contexts is not orig_stack_contexts: yielded = _create_future() yielded.set_exception( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: future_set_result_unless_cancelled(future, _value_from_stopiteration(e)) except Exception: future_set_exc_info(future, sys.exc_info()) else: runner = Runner(result, future, yielded) # 这一行不太理解,注释是说让它保持存在 future.add_done_callback(lambda _: runner) yielded = None try: return future finally: future = None future_set_result_unless_cancelled(future, result) return future wrapper.__wrapped__ = wrapped wrapper.__tornado_coroutine__ = True return wrapper
coroutine 装饰的函数,首先会生成一个 generator, 并对其第一次 next 调用。
这里我们可以注意到,这个yield
后的函数无论如何它都是会被立刻调用的。
所以 yield 后的函数必须也是异步或者耗时不长的,才能达到预期的异步效果,否则该阻塞的还是会阻塞。
在 next
调用后,有个异常捕获,在代码 36L,这个异常捕获犹为重要,因为我们知道 yield
等式,比如var = yield func()
,var
不会被赋值成func
的返回。tornado
提供了一个异常类Return
作为返回,通过在调用出捕获Return
异常,取出其返回值得以实现。
在第一次的next
后,如果没有其他异常,就会创建一个Runner
类,这个Runner
类的作用就是,把其他的代码通过yield
不断暂停恢复,放在ioloop里运行。
Future
Future
可以说是个中介,它是用来沟通coroutine
和ioloop
的,coroutine
返回的都是Future
但其实最重要的还是管理协程的暂停与恢复,一个ioloop中保存着多个后端运行类Runner
类的runner
方法,在ioloop中不断暂停恢复,而每一个runner
又都会绑定一个future
,只有future
被set_done
了,才表示上一阶段已经完成并暂停了,可以继续恢复运行。
class Future(object): def done(self):# 协程执行完毕并暂停,可对其恢复 return self._done def result(self, timeout=None): self._check_done() # 如果没有 done 抛出异常 return self._result def add_done_callback(self, fn): # 添加回调函数 if self._done: from tornado.ioloop import IOLoop IOLoop.current().add_callback(fn, self) else: self._callbacks.append(fn) def set_result(self, result): # 设置result & done self._result = result self._set_done() def _set_done(self): # 将所有回调函数放到 ioloop中 self._done = True if self._callbacks: from tornado.ioloop import IOLoop loop = IOLoop.current() for cb in self._callbacks: loop.add_callback(cb, self) self._callbacks = None
IOLoop
IOLoop是在整个tornado的主事件循环。按我理解主要做了两件事
- 执行异步的callback
- io复用
并且这两件事它是写死的,它是顺序执行的,这就直接反驳了我最开始的想法:让两者并行执行
io复用根据系统会自动调整的,具体的我也不再细说。
以下是精简之后的源码
class PollIOLoop(IOLoop): def add_future(self, future, callback): assert is_future(future) callback = stack_context.wrap(callback) future_add_done_callback( future, lambda future: self.add_callback(callback, future)) def add_callback(self, callback, *args, **kwargs): if self._closing: return self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) # 将其以偏函数形式保存起来 if thread.get_ident() != self._thread_ident: self._waker.wake() else: pass def start(self): # 这里有一堆初始化操作 while True: ncallbacks = len(self._callbacks) due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) if not self._running: break # 这里有设置poll_timeout event_pairs = self._impl.poll(poll_timeout) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None # 这里有一些后置处理
Runner
Runner.run
是后置处理的主函数,我在之前也有提到过,它通过获取future
的result
,再将其send
以恢复协程继续执行。
如果不能捕获到任何异常,就说明有新的coroutine
,新的coroutine
都是通过handle_yield
将其放进ioloop
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() self.stack_context_deactivate = None # 上面一堆不需要看的初始化 if self.handle_yield(first_yielded): gen = result_future = first_yielded = None self.run() def handle_yield(self, yielded): self.future = convert_yielded(yielded) if self.future is moment: self.io_loop.add_callback(self.run) return False elif not self.future.done(): def inner(f): # Break a reference cycle to speed GC. f = None self.run() self.io_loop.add_future( self.future, inner) return False return True def run(self): if self.running or self.finished: return try: self.running = True while True: future = self.future if not future.done(): return self.future = None try: orig_stack_contexts = stack_context._state.contexts exc_info = None try: value = future.result() except Exception: self.had_exception = True exc_info = sys.exc_info() future = None yielded = self.gen.send(value) except (StopIteration, Return) as e: self.finished = True self.future = _null_future if self.pending_callbacks and not self.had_exception: raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) future_set_result_unless_cancelled(self.result_future, _value_from_stopiteration(e)) self.result_future = None self._deactivate_stack_context() return except Exception: # 一些结束操作 return if not self.handle_yield(yielded): return yielded = None finally: self.running = False
以上就是python Tornado异步使用场景源码解析的详细内容,更多关于python Tornado异步的资料请关注脚本之家其它相关文章!