简单理解Python中的事件循环EventLoop
作者:songcser
简介
在 python 3中,加入了 asyncio 模块,来实现协程,其中一个很重要的概念是事件循环,整个异步流程都是事件循环推动的。下面自己实现一个相对简单的EventLoop,了解一下事件循环是如何进行运转的。
事件循环
下面看一下整个流程的实现过程
将以下代码写入 spider_event_loop.py 文件:
# spider_event_loop.py import time import os import socket from urllib.parse import urlparse from collections import deque from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ # selector = DefaultSelector() urls = ['https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image', 'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image', 'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image', 'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image', 'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image', 'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image', 'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image', 'https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image', 'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image', 'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image' ] # 事件循环实现 class EventLoop: def __init__(self): # 整个流程是否 self._stopped = False self._ready = deque() self._selector = DefaultSelector() # 向 _ready 队列中加入回调方法 def call_soon(self, callback, *args): # 将事件添加到队列里 handle = Handle(callback, self, *args) self._ready.append(handle) # 套接字注册读事件 def add_writer(self, fd, callback, *args): # 将回调方法封装成Handle handle = Handle(callback, self, *args) try: # 检查有没有注册过 key = self._selector.get_key(fd) except KeyError: # 没有注册过,进行写事件注册 self._selector.register(fd, EVENT_WRITE, handle) else: # 注册过,添加写事件 mask = key.events self._selector.modify(fd, mask | EVENT_WRITE, handle) # 套接字注销读事件 def remove_writer(self, fd): try: # 检查有没有注册过 self._selector.get_key(fd) except KeyError: # 没有,直接返回 return # 注销事件 self._selector.unregister(fd) # 套接字注册写事件 def add_reader(self, fd, callback, *args): # 将回调方法,封装成Handle handle = Handle(callback, self, *args) try: # 检查是否注册过 key = self._selector.get_key(fd) except KeyError: # 没有注册过,进行读事件注册 self._selector.register(fd, EVENT_READ, handle) else: # 注册过,添加读事件 mask = key.events self._selector.modify(fd, mask | EVENT_READ, handle) # 套接字注销写事件 def remove_reader(self, fd): try: # 检查有没有注册过 self._selector.get_key(fd) except KeyError: # 没有,直接返回 return # 注销事件 self._selector.unregister(fd) # 处理事件 def _process_events(self, event_list): for key, mask in event_list: fileobj, handle = key.fileobj, key.data if mask & EVENT_READ: self.remove_reader(fileobj) if mask & EVENT_WRITE: self.remove_writer(fileobj) # 注册的事件发生,将注册时的handle放入队列中 self._ready.append(handle) # 运行事件队列,执行回调方法 def run_once(self): # 获取发生事件的所有文件描述符 event_list = self._selector.select() self._process_events(event_list) while self._ready: # 将队列中的handle取出,执行回调方法 handle = self._ready.popleft() handle.run() # 无限循环,直到stopped def run_forever(self): while True: # 执行事件 self.run_once() if self._stopped: break # 执行协程方法,直到事件循环执行结束 def run_until_complete(self, future): # 在 future 中添加回调方法,在 future set_value 的时候执行这个回调方法 future.add_done_callback(self._run_until_complete_cb) # 开始无限循环 self.run_forever() # 返回 future 的值 return future.value # 结束时的回调方法 def _run_until_complete_cb(self, future): # 结束无限循环 self.close() # 关闭方法 def close(self): self._stopped = True # 创建 Future 实例 def create_future(self): return Future(loop=self) # 创建 Task 实例 def create_task(self, coro): return Task(coro, loop=self)
以上类是一个简单是事件循环,事件循环基本的操作都已经包含,注册注销事件,无限循环获取文件描述符的事件,执行Handle队列,这里是使用的 _ready 队列,表示已经准备好的 Handle,由于没有延时事件所以省略了调度队列
# Handle 类,对回调方法做封装 class Handle: def __init__(self, callback, loop, *args): self._callback = callback self._args = args # 执行回调方法 def run(self): self._callback(*self._args)
Handle 类对回调方法进行封装
# Future 类 class Future: def __init__(self, loop=None): self.value = None # 将要执行的回调方法 self._step_func = [] # 和事件循环关联 if loop: self._loop = loop else: self._loop = get_event_loop() def add_done_callback(self, func): # 添加回调方法 self._step_func.append(func) def set_value(self, value): # 设置值 self.value = value for func in self._step_func: # 将回调方法添加到事件循环的 _ready 队列中 # 在下次事件循环中执行回调方法 self._loop.call_soon(func, self) # 实现 __iter__ 方法,Future 类的实例为可迭代对象 def __iter__(self): # 该语句起到暂停协程的作用,并返回实例本身 yield self # 该语句定义的返回值会赋给 yield from 语句等号前面的变量 return self.value
Future类和以前的实现,区分不是很大,只是和 loop 做关联,将回调方法放入事件循环的队列中,依靠事件循环执行回调方法
# Task 类,继承 Future 类, 也是可迭代的 class Task(Future): def __init__(self, coro, loop=None): super().__init__(loop=loop) self.coro = coro # step 方法直接放入事件循环的队列中进行执行 # 激活协程方法运行 self._loop.call_soon(self.step, self) def step(self, future): try: # 向协程发送数据,驱动协程执行 # 直到遇到 yield ,返回新的 Future实例 new_futrue = self.coro.send(future.value) except StopIteration as exc: # 在协程方法执行到结束,或者 return 之后,抛出 StopIteration 异常 # 由于 Task 也是 Future, 在协程执行完之后,最后 Task 执行自己的回调方法 self.set_value(exc.value) return # 将 step 加入回调列表,等待下次驱动执行 # 在 Future 执行 set_value 时,又会执行 step 方法,再次驱动协程执行 new_futrue.add_done_callback(self.step)
Task 类继承 Future 类,类的实例也是可迭代对象,在 step 方法会不断触发协程继续执行,在协程执行结束之后,抛出 StopIteration 异常,最后 Task 类再执行回调方法,主要做一些清理工作或者收集结果。
class AsyncSocket: def __init__(self, loop=None): # 绑定事件循环 if loop: self._loop = loop else: self._loop = get_event_loop() self.sock = socket.socket() self.sock.setblocking(False) # 该方法用于向服务器发送连接请求并注册监听套接字的可写事件 def connect(self, address): # 由 loop 创建 Future 实例 f = self._loop.create_future() try: self.sock.connect(address) except BlockingIOError: pass # 注册写事件,在连接成功事件发生之后,调用回调方法 self._loop.add_writer(self.sock.fileno(), self._connect_cb, f) # 暂停执行,等待写事件发生 yield from f # 回调方法,连接事件发生 def _connect_cb(self, future): # 设置 Future 值,并且驱动协程继续执行 future.set_value(None) # 向服务器发送获取图片的请求 def send(self, data): self.sock.send(data) # 该方法会多次执行,以获取服务器返回的数据片段 def read(self): f = self._loop.create_future() # 注册读事件,在可读事件发生之后,调用回调方法 self._loop.add_reader(self.sock.fileno(), self._read_cb, f, self.sock) # 暂停执行,等待读事件发生 yield from f # 返回最后的值 return f.value # 回调方法,读事件发生 def _read_cb(self, future, sock): # 套接字读取 4096 字节的数据,设置 Future 值,并且驱动协程继续执行 future.set_value(sock.recv(4096)) # 读取所有数据 def read_all(self): data = b'' while True: # 不断读取 sock 的数据 value = yield from self.read() if value: data += value else: return data # 关闭客户端套接字 def close(self): self.sock.close()
AsyncSocket 类是对 Socket 做的简单封装,绑定事件循环 Loop ,读写事件都是在 Loop 中注册,由事件循环来驱动 Socket 的读写。
# 爬虫类 class Crawler: def __init__(self, url): self._url = url self.url = urlparse(url) self.response = b'' def fetch(self): self.time = time.time() # AsyncSocket 类的实例对象负责完成数据获取的工作 sock = AsyncSocket() # 向服务器发送连接请求,协程会暂停到嵌套协程中的某个 yield from 处 yield from sock.connect((self.url.netloc, 80)) data = 'GET {0} HTTP/1.1\r\nHost: {1}\r\nConnection: close\r\n\r\n \ '.format(self.url.path, self.url.netloc) # 发送请求数据 sock.send(data.encode()) # 读取全部的数据 self.response = yield from sock.read_all() # 关闭 socket sock.close() # 将下载的图片写入文件 with open('pic/{}'.format(self.url.path[1:].split('/')[-1]), 'wb') as f: f.write(self.response.split(b'\r\n\r\n')[1]) return "URL: {0}, 耗时: {1:.3f}s".format(self._url, time.time() - self.time)
Crawler 完成图片爬取工作,在 fetch 方法中,AsyncSocket 完成请求连接,发送数据,接收数据的工作,整个流程非常的清晰,和同步阻塞模式流程基本相同,但是性能会有大量提升
# 事件循环,全局变量 _event_loop = None # 获取事件循环,这里是要获取同一个全局实例 def get_event_loop(): global _event_loop if _event_loop is None: # 生成一个新的事件循环实例 _event_loop = EventLoop() return _event_loop # 收集所有的 task def gather(tasks, loop=None): # 使用 Future 类 收集 所有 tasks 的结果 outer = Future(loop=loop) # tasks 数量 count = len(tasks) nfinished = 0 # 收集结果 results = [] # 回调方法 def _gather_cb(f): nonlocal nfinished # 完成数量 nfinished += 1 # 收集结果 results.append(f.value) if nfinished == count: # 都完成之后,outer 设置值,同时执行回调方法 outer.set_value(results) for task in tasks: # 所有的 task 都添加回调方法 # task 中协程方法执行完成时, 在 step 方法中会抛出 StopIteration 异常 # 这时候 task 会执行 set_value 方法,同时会执行 _gather_cb 回调方法 task.add_done_callback(_gather_cb) # 将 Future 实例返回 return outer
以上获取事件循环单例,所有的事件都是在一个循环中执行。gather 方法使多个 task 并发执行,并且收集所有 task 的结果
def main(): os.system('mkdir -p pic') start = time.time() loop = get_event_loop() tasks = [] for url in urls: # 爬虫实例 crawler = Crawler(url) # 将 fetch 方法封装成 task 实例 tasks.append(Task(crawler.fetch())) # gather 方法将收集所有的 task 结果,并且返回 Futrue 实例, # Futrue 实例在 run_until_complete 方法中添加了 _run_until_complete_cb 回调方法 # 在所有的 task 执行结束之后,Future 实例执行 set_value 方法,同时执行回调方法 _run_until_complete_cb # 在 _run_until_complete_cb 方法中执行了 close 方法,无限循环结束,整个流程结束 # 返回 results 的值 results = loop.run_until_complete(gather(tasks)) for res in results: print(res) if __name__ == '__main__': main()
执行 spider_event_loop.py 文件
输出:
$ python3 spider_event_loop.py
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.055s
URL: https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.101s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.157s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.158s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.162s
URL: https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.164s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.163s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.164s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.224s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.492s
总共耗时: 0.495s
使用事件循环,异步执行爬取数据,消耗时间很少,性能很高。
现在我们把整个流程梳理一下:
- 从main方法开始执行
- 获取 EventLoop 实例,loop 实例是全局变量
- 将 urls 中的 url 分别进行处理,生成 Crawler 实例
- Crawler 的 fetch 方法是协程,不会立即执行,Task 包装协程方法
- 在 Task 的 __init__ 方法中, 会将 Task 的 step 方法,加入到 loop 的 队列中
- step 方法会被封装成 Handle 方法,加入到 loop 的 _ready 的队列中
- 在所有的 task 处理完成之后,加入到 tasks 列表中
- 接下来执行 gather 方法,outer 是 Future 实例, 所有的 task 实例都添加 _gather_cb 回调方法
- 在 _gather_cb 方法中,会收集 task 的结果
- 接下来执行 loop 的 run_until_complete 方法,入参是 gather 方法返回的 outer 的 Future 实例
- run_until_complete 方法中,outer 实例加入 _run_until_complete_cb 回调方法
- 开始执行 run_forever 方法,进入循环
- 执行 run_once 方法,获取所有的发生时间的文件描述符,由于现在还没有注册事件,所以事件列表为空
- 将 _ready 队列中所有的 handle 取出,开始执行,由于 task 的 __init__ 方法中将 step 放入了队列中,所以这里会执行 task 的 step 方法
- task 中的 coro 是 fetch 协程,所以在 step 方法中 coro 会执行 send 方法,由于 future 就是 task 实例,value 还是 None,所以会激活协程方法执行
- fetch 方法开始真正执行,创建 AsyncSocket 实例 sock
- 执行到 sock.connect((self.url.netloc, 80)) 会在 connect 方法中 创建新的 future 实例
- 进行 sock 请求连接,再将网络套接字的文件描述符注册写事件,在 add_writer 方法中 _connect_cb 封装成 handle 实例,在 事件发生之后会执行 handle 的 run 方法
- 暂停执行,返回 future 实例
- 由于是在 task 的 step 方法中激活的协程执行,所以 new_future 就是返回的 future 实例
- new_future 将 step 加入到回调列表中
- 现在流程都暂停了,等待事件的发生
- 在事件循环中,注册的文件描述符的写事件发生之后,_process_events 方法循环处理所有的事件,取出事件注册时候的 handle 方法,加入到 _ready 执行队列里面
- 再从 _ready 的队列里面取出 handle 方法,此 handle 方法是注册时候封装的 _connect_cb 回调方法
- 执行 _connect_cb 方法,参数是 future,也是返回的 new_future,这时候再执行 future 的 set_value 方法
- 设置 future 的值,并且将回调队列里的回调方法取出,加入到事件循环的 _ready 队列里,由于 new_future 将 step 方法加入了回调队列,所以会再次执行 task 的 step 方法
- step 又继续驱动协程执行,fetch 又开始了继续执行
- sock 发送数据,然后注册读事件,由于读取数据时一次不会全部读完,会多次注册读事件,读取全部的数据
- 读事件和写事件是相同的,在注册之后,就返回 future 对象,添加 step 回调方法,暂停执行。等待事件的发生
- 在所有的数据读取之后,fetch 方法会执行结束,return 当前的数据
- 由于 task 实例的 step 驱动的 fetch 方法,所以 step 方法会抛出 StopIteration 异常
- task 继承的 Future,所以可以执行 set_value 方法,异常的值就是 fetch 返回的值
- 在 gather 方法中,task 添加了 _gather_cb 回调方法,所以在 set_value 时,会调用回调方法
- 将 task 的值收集到 results 列表中,等所有的 task 都执行结束之后,outer 实例开始执行 set_value
- outer 添加了 _run_until_complete_cb 回调方法,所以这里同样会执行回调方法,在 _run_until_complete_cb 方法中调用事件循环的 close 方法,_stopped 设置为 True
- run_forever 退出无限循环,整个流程执行结束,将 outer 的值返回
整个流程梳理完了,Task 不断驱动协程执行,EventLoop 监听事件循环,又不断驱动 Task 执行,Future 在协程的通道中传输数据,几个部分配合合作完成整个流程。
以上就是简单理解Python中的事件循环EventLoop的详细内容,更多关于Python事件循环的资料请关注脚本之家其它相关文章!