python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python事件循环

简单理解Python中的事件循环EventLoop

作者:songcser

在 python 3中,加入了 asyncio 模块,来实现协程,其中一个很重要的概念是事件循环,本文我们就来自己实现一个相对简单的EventLoop,从而了解一下事件循环是如何进行运转的吧

简介

在 python 3中,加入了 asyncio 模块,来实现协程,其中一个很重要的概念是事件循环,整个异步流程都是事件循环推动的。下面自己实现一个相对简单的EventLoop,了解一下事件循环是如何进行运转的。

事件循环

下面看一下整个流程的实现过程

将以下代码写入 spider_event_loop.py 文件:

# spider_event_loop.py
import time
import os
import socket
from urllib.parse import urlparse
from collections import deque
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
# selector = DefaultSelector()
urls = ['https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image',
        'https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image',
        'https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image',
        'https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image'
]
# 事件循环实现
class EventLoop:
    def __init__(self):
     	# 整个流程是否
        self._stopped = False
        self._ready = deque()
        self._selector = DefaultSelector()
    # 向 _ready 队列中加入回调方法
    def call_soon(self, callback, *args):
        # 将事件添加到队列里
        handle = Handle(callback, self, *args)
        self._ready.append(handle)
    # 套接字注册读事件
    def add_writer(self, fd, callback, *args):
        # 将回调方法封装成Handle
        handle = Handle(callback, self, *args)
        try:
            # 检查有没有注册过
            key = self._selector.get_key(fd)
        except KeyError:
            # 没有注册过,进行写事件注册
            self._selector.register(fd, EVENT_WRITE, handle)
        else:
            # 注册过,添加写事件
            mask = key.events
            self._selector.modify(fd, mask | EVENT_WRITE, handle)
    # 套接字注销读事件
    def remove_writer(self, fd):
        try:
            # 检查有没有注册过
            self._selector.get_key(fd)
        except KeyError:
            # 没有,直接返回
            return
        # 注销事件
        self._selector.unregister(fd)
    # 套接字注册写事件
    def add_reader(self, fd, callback, *args):
        # 将回调方法,封装成Handle
        handle = Handle(callback, self, *args)
        try:
            # 检查是否注册过
            key = self._selector.get_key(fd)
        except KeyError:
            # 没有注册过,进行读事件注册
            self._selector.register(fd, EVENT_READ, handle)
        else:
            # 注册过,添加读事件
            mask = key.events
            self._selector.modify(fd, mask | EVENT_READ, handle)
    # 套接字注销写事件
    def remove_reader(self, fd):
        try:
            # 检查有没有注册过
            self._selector.get_key(fd)
        except KeyError:
            # 没有,直接返回
            return
        # 注销事件
        self._selector.unregister(fd)
    # 处理事件
    def _process_events(self, event_list):
        for key, mask in event_list:
            fileobj, handle = key.fileobj, key.data
            if mask & EVENT_READ:
                self.remove_reader(fileobj)
            if mask & EVENT_WRITE:
                self.remove_writer(fileobj)
            # 注册的事件发生,将注册时的handle放入队列中
            self._ready.append(handle)
    # 运行事件队列,执行回调方法
    def run_once(self):
        # 获取发生事件的所有文件描述符
        event_list = self._selector.select()
        self._process_events(event_list)
        while self._ready:
            # 将队列中的handle取出,执行回调方法
            handle = self._ready.popleft()
            handle.run()
    # 无限循环,直到stopped
    def run_forever(self):
        while True:
            # 执行事件
            self.run_once()
            if self._stopped:
                break
    # 执行协程方法,直到事件循环执行结束
    def run_until_complete(self, future):
        # 在 future 中添加回调方法,在 future set_value 的时候执行这个回调方法
        future.add_done_callback(self._run_until_complete_cb)
        # 开始无限循环
        self.run_forever()
        # 返回 future 的值
        return future.value
    # 结束时的回调方法
    def _run_until_complete_cb(self, future):
        # 结束无限循环
        self.close()
    # 关闭方法
    def close(self):
        self._stopped = True
    # 创建 Future 实例
    def create_future(self):
        return Future(loop=self)
    # 创建 Task 实例
    def create_task(self, coro):
        return Task(coro, loop=self)

以上类是一个简单是事件循环,事件循环基本的操作都已经包含,注册注销事件,无限循环获取文件描述符的事件,执行Handle队列,这里是使用的 _ready 队列,表示已经准备好的 Handle,由于没有延时事件所以省略了调度队列

# Handle 类,对回调方法做封装
class Handle:
    def __init__(self, callback, loop, *args):
        self._callback = callback
        self._args = args
    # 执行回调方法
    def run(self):
        self._callback(*self._args)

Handle 类对回调方法进行封装

# Future 类
class Future:
    def __init__(self, loop=None):
        self.value = None
        # 将要执行的回调方法
        self._step_func = []
        # 和事件循环关联
        if loop:
            self._loop = loop
        else:
            self._loop = get_event_loop()
    def add_done_callback(self, func):
        # 添加回调方法
        self._step_func.append(func)
    def set_value(self, value):
        # 设置值
        self.value = value
        for func in self._step_func:
            # 将回调方法添加到事件循环的 _ready 队列中
            # 在下次事件循环中执行回调方法
            self._loop.call_soon(func, self)
    # 实现 __iter__ 方法,Future 类的实例为可迭代对象
    def __iter__(self):
        # 该语句起到暂停协程的作用,并返回实例本身
        yield self
        # 该语句定义的返回值会赋给 yield from 语句等号前面的变量
        return self.value

Future类和以前的实现,区分不是很大,只是和 loop 做关联,将回调方法放入事件循环的队列中,依靠事件循环执行回调方法

# Task 类,继承 Future 类, 也是可迭代的
class Task(Future):
    def __init__(self, coro, loop=None):
        super().__init__(loop=loop)
        self.coro = coro
        # step 方法直接放入事件循环的队列中进行执行
        # 激活协程方法运行
        self._loop.call_soon(self.step, self)
    def step(self, future):
        try:
            # 向协程发送数据,驱动协程执行
            # 直到遇到 yield ,返回新的 Future实例
            new_futrue = self.coro.send(future.value)
        except StopIteration as exc:
            # 在协程方法执行到结束,或者 return 之后,抛出 StopIteration 异常
            # 由于 Task 也是 Future, 在协程执行完之后,最后 Task 执行自己的回调方法
            self.set_value(exc.value)
            return
        # 将 step 加入回调列表,等待下次驱动执行
        # 在 Future 执行 set_value 时,又会执行 step 方法,再次驱动协程执行
        new_futrue.add_done_callback(self.step)

Task 类继承 Future 类,类的实例也是可迭代对象,在 step 方法会不断触发协程继续执行,在协程执行结束之后,抛出 StopIteration 异常,最后 Task 类再执行回调方法,主要做一些清理工作或者收集结果。

class AsyncSocket:
    def __init__(self, loop=None):
        # 绑定事件循环
        if loop:
            self._loop = loop
        else:
            self._loop = get_event_loop()
        self.sock = socket.socket()
        self.sock.setblocking(False)
    # 该方法用于向服务器发送连接请求并注册监听套接字的可写事件
    def connect(self, address):
        # 由 loop 创建 Future 实例
        f = self._loop.create_future()
        try:
            self.sock.connect(address)
        except BlockingIOError:
            pass
        # 注册写事件,在连接成功事件发生之后,调用回调方法
        self._loop.add_writer(self.sock.fileno(), self._connect_cb, f)
        # 暂停执行,等待写事件发生
        yield from f
    # 回调方法,连接事件发生
    def _connect_cb(self, future):
        # 设置 Future 值,并且驱动协程继续执行
        future.set_value(None)
    # 向服务器发送获取图片的请求
    def send(self, data):
        self.sock.send(data)
    # 该方法会多次执行,以获取服务器返回的数据片段
    def read(self):
        f = self._loop.create_future()
        # 注册读事件,在可读事件发生之后,调用回调方法
        self._loop.add_reader(self.sock.fileno(), self._read_cb, f, self.sock)
        # 暂停执行,等待读事件发生
        yield from f
        # 返回最后的值
        return f.value
    # 回调方法,读事件发生
    def _read_cb(self, future, sock):
        # 套接字读取 4096 字节的数据,设置 Future 值,并且驱动协程继续执行
        future.set_value(sock.recv(4096))
    # 读取所有数据
    def read_all(self):
        data = b''
        while True:
            # 不断读取 sock 的数据
            value = yield from self.read()
            if value:
                data += value
            else:
                return data
    # 关闭客户端套接字
    def close(self):
        self.sock.close()

AsyncSocket 类是对 Socket 做的简单封装,绑定事件循环 Loop ,读写事件都是在 Loop 中注册,由事件循环来驱动 Socket 的读写。

# 爬虫类
class Crawler:
    def __init__(self, url):
        self._url = url
        self.url = urlparse(url)
        self.response = b''
    def fetch(self):
        self.time = time.time()
        # AsyncSocket 类的实例对象负责完成数据获取的工作
        sock = AsyncSocket()
        # 向服务器发送连接请求,协程会暂停到嵌套协程中的某个 yield from 处
        yield from sock.connect((self.url.netloc, 80))
        data = 'GET {0} HTTP/1.1\r\nHost: {1}\r\nConnection: close\r\n\r\n \
                '.format(self.url.path, self.url.netloc)
        # 发送请求数据
        sock.send(data.encode())
        # 读取全部的数据
        self.response = yield from sock.read_all()
        # 关闭 socket
        sock.close()
        # 将下载的图片写入文件
        with open('pic/{}'.format(self.url.path[1:].split('/')[-1]), 'wb') as f:
            f.write(self.response.split(b'\r\n\r\n')[1])
        return "URL: {0}, 耗时: {1:.3f}s".format(self._url, time.time() - self.time)

Crawler 完成图片爬取工作,在 fetch 方法中,AsyncSocket 完成请求连接,发送数据,接收数据的工作,整个流程非常的清晰,和同步阻塞模式流程基本相同,但是性能会有大量提升

# 事件循环,全局变量
_event_loop = None
# 获取事件循环,这里是要获取同一个全局实例
def get_event_loop():
    global _event_loop
    if _event_loop is None:
        # 生成一个新的事件循环实例
        _event_loop = EventLoop()
    return _event_loop
# 收集所有的 task
def gather(tasks, loop=None):
    # 使用 Future 类 收集 所有 tasks 的结果
    outer = Future(loop=loop)
    # tasks 数量
    count = len(tasks)
    nfinished = 0
    # 收集结果
    results = []
    # 回调方法
    def _gather_cb(f):
        nonlocal nfinished
        # 完成数量
        nfinished += 1
        # 收集结果
        results.append(f.value)
        if nfinished == count:
            # 都完成之后,outer 设置值,同时执行回调方法
            outer.set_value(results)
    for task in tasks:
        # 所有的 task 都添加回调方法
        # task 中协程方法执行完成时, 在 step 方法中会抛出 StopIteration 异常
        # 这时候 task 会执行 set_value 方法,同时会执行 _gather_cb 回调方法
        task.add_done_callback(_gather_cb)
    # 将 Future 实例返回
    return outer

以上获取事件循环单例,所有的事件都是在一个循环中执行。gather 方法使多个 task 并发执行,并且收集所有 task 的结果

def main():
    os.system('mkdir -p pic')
    start = time.time()
    loop = get_event_loop()
    tasks = []
    for url in urls:
        # 爬虫实例
        crawler = Crawler(url)
        # 将 fetch 方法封装成 task 实例
        tasks.append(Task(crawler.fetch()))        
    # gather 方法将收集所有的 task 结果,并且返回 Futrue 实例,
    # Futrue 实例在 run_until_complete 方法中添加了 _run_until_complete_cb 回调方法
    # 在所有的 task 执行结束之后,Future 实例执行 set_value 方法,同时执行回调方法 _run_until_complete_cb
    # 在 _run_until_complete_cb 方法中执行了 close 方法,无限循环结束,整个流程结束
    # 返回 results 的值
    results = loop.run_until_complete(gather(tasks))
    for res in results:
        print(res)
if __name__ == '__main__':
    main()

执行 spider_event_loop.py 文件

输出:

$ python3 spider_event_loop.py
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/459c6c73887a4206a33b13ef23988809~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.055s
URL: https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9bc51bc53f634bf79b5de5c8b9810817~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.101s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80cd2a1d165b43beb6234d893ce391e2~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.157s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0c4bd38c2fe3434587025748d051362e~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.158s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/80701ade82d345cd8aa0b08fef008fe1~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.162s
URL: https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ea18df95f26f4314a0a36d11d4a067d0~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.164s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d3769e4d468a4f64a1e2879f94ad742b~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.163s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/095c342a796c411cad1800396746bdaf~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.164s
URL: https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/2b95ac8571ba403180743495ed56e492~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.224s
URL: https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99855ed1f79c456f9fdcd83ce5cff4f2~tplv-k3u1fbpfcp-watermark.image, 耗时: 0.492s
总共耗时: 0.495s

使用事件循环,异步执行爬取数据,消耗时间很少,性能很高。

现在我们把整个流程梳理一下:

整个流程梳理完了,Task 不断驱动协程执行,EventLoop 监听事件循环,又不断驱动 Task 执行,Future 在协程的通道中传输数据,几个部分配合合作完成整个流程。

以上就是简单理解Python中的事件循环EventLoop的详细内容,更多关于Python事件循环的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文