Async IO在Python中的异步编程工作实例解析
作者:简讯Alfred
引言
许多程序员都熟悉编写顺序(同步)代码,在异步世界中,事件的发生独立于主程序流程。这意味着动作在后台执行,无需等待上一个动作完成。
换句话说,代码行是并发执行的。
想象一下,你有一些独立的任务,每个任务都需要大量的运行时间才能完成。他们的输出不相互依赖。所以,您想一次启动它们。如果这些任务按特定顺序执行,程序将不得不等待每个任务完成后再开始下一个任务。等待时间会阻塞程序。
异步编程范例有助于并发执行这些任务,并确保您可以克服等待时间并更有效地使用资源。
在 Python 中引入异步
Python 中引入了两个主要组件:
1.
async io
这是一个 Python 包,允许 API 运行和管理协程。2.
async/await
用来定义协程。
例如要进行 HTTP 调用,请考虑使用 aiohttp
,这是一个 Python 包,允许异步进行 HTTP 调用。在异步代码中,常用的 requests
库可能效果不是很好。
同样,如果您正在使用 Mongo 数据库,而不是依赖同步驱动程序(如 mongo-python
),您必须使用异步驱动程序(如 moto
异步访问 MongoDB)。
在异步世界中,一切都在事件循环中运行。这允许您一次运行多个协程。我们将在本教程中了解协程是什么。
里面的一切 async def
都是异步代码,其他一切都是同步的。
编写异步代码不像编写同步代码那么容易。Python 异步模型基于事件、回调、传输、协议和期货等概念。
异步如何工作
asyncio
库提供了两个关键字, async
和 await
.
让我们看一下这个 async hello-world 示例:
import asyncio async def hello(): print("Hello World!") await asyncio.sleep(1) print("Hello again!") asyncio.run(hello()) # Hello World! # Hello again!
乍一看你可能认为这是一个同步代码,因为第二次打印等待 1 秒打印Hello again!
在Hello World!
之后。但是这段代码实际上是异步的。
协程
任何定义为 async def
的函数都是像上面那样的协程。需要注意的是,调用 hello()
函数需要在 asyncio.run()
中执行,
为了运行协程,asyncio
提供了三种主要机制:
asyncio.run()
函数,它是启动事件循环并运行异步的主要入口点。
使用 await
协程的结果并将控制权传递给事件循环。
import asyncio import time async def say_something(delay, words): print(f"Before {words}") await asyncio.sleep(delay) print(f"After {words}") async def main(): print(f"Started: {time.strftime('%X')}") await say_something(1, "Task 1") await say_something(2, "Task 2") print(f"Finished: {time.strftime( '%X' )}") asyncio.run(main()) # Started:15:59:52 # Before Task 1 # After Task 1 # Before Task 2 # After Task 2 # Finished:15:59:55
前面的代码片段仍然等待 say_something()
协程完成,因此它在 1 秒内执行第一个任务,然后在等待 2 秒后执行第二个任务。
要让协程并发运行,我们应该创建任务,这是第三种机制。
asyncio.create_task()
用于安排协程执行的函数。
import asyncio import time async def say_something(delay, words): print(f"Before {words}") await asyncio.sleep(delay) print(f"After {words}") async def main(): print(f"Started: {time.strftime('%X')}") task1 = asyncio.create_task(say_something(1,"Task 1")) task2 = asyncio.create_task(say_something(2, "Task 2")) await task1 await task2 print(f"Finished: {time.strftime('%X')}") asyncio.run(main()) # Started:16:07:35 # Before Task 1 # Before Task 2 # After Task 1 # After Task 2 # Finished:16:07:37
上面的代码现在并发运行,say_something()
协程不再等待 say_something()
协程完成。而是同时运行具有不同参数的同一个协程。
发生的情况如下:
say_something()
协程从参数的第一个任务(1 秒和一个字符串Task 1
)开始。这个任务叫做 task1
。
然后它会暂停协程的执行并等待 1 秒让 say_something()
协程在遇到 await
关键字时完成。它将控制权返回给事件循环。
同样对于第二个任务,它会暂停协程的执行并等待 2 秒让 say_something()
协程完成,因为它遇到 await
关键字。
task1
控制返回到事件循环后,事件循环继续执行第二个任务 task2
因为 asyncio.sleep()
还没有完成。
asyncio.create_task()
包装 say_something()
函数并使其作为异步任务同时运行协程。 如您所见,上面的代码片段显示它的运行速度比以前快了 1 秒。
当 asyncio.create_task()
被调用时,协程会自动安排在事件循环中运行。
任务可以帮助您并发运行多个协程,但这并不是实现并发的唯一方法。
使用 asyncio.gather() 运行并发任务
另一种同时运行多个协程的方法是使用 asyncio.gather()
函数。此函数将协程作为参数并并发运行它们。
import asyncio import time async def greetings(): print("Welcome") await asyncio.sleep(1) print("Goodbye") async def main(): start = time.time() await asyncio.gather(greetings(), greetings()) elapsed = time.time() - start print(f"{__name__} executed in {elapsed:0.2f} seconds.") asyncio.run(main()) # Welcome # Welcome # Goodbye # Goodbye # __main__ executed in 1.00 seconds.
在前面的代码中,greetings()
协程被并发执行了两次。
等待对象
如果一个对象可以与 await
关键字一起使用,则该对象称为可等待对象。可等待对象主要有 3 种类型:coroutines
、tasks
和futures
。
coroutines
import asyncio async def mult(first, second): print("Calculating multiplication...") await asyncio.sleep(1) mul = first * second print(f"{first} multiplied by {second} is {mul}") return mul async def add(first, second): print("Calculating sum...") await asyncio.sleep(1) sum = first + second print(f"Sum of {first} and {second} is {sum}") return sum async def main(first, second): await mult(first, second) await add(first, second) asyncio.run(main(10, 20)) # Calculating multiplication... # 10 multiplied by 20 is 200 # Calculating sum... # Sum of 10 and 20 is 30
在前面的示例中, 协同程序 main()
等待 mult()
和 add()
结束。
假设您在 mult
协程之前省略了 await
关键字。然后您将收到以下错误: RuntimeWarning: coroutine 'mult' was never awaited.
。
tasks
要安排协程在事件循环中运行,我们使用 asyncio.create_task()
函数。
import asyncio async def mult(first, second): print("Calculating multiplication...") await asyncio.sleep(1) mul = first * second print(f"{first} multiplied by {second} is {mul}") return mul async def add(first, second): print("Calculating sum...") await asyncio.sleep(1) sum = first + second print(f"Sum of {first} and {second} is {sum}") return sum async def main(first, second): mult_task = asyncio.create_task(mult(first, second)) add_task = asyncio.create_task(add(first, second)) await mult_task await add_task asyncio.run(main(10, 20)) # Calculating multiplication... # Calculating sum... # 10 multiplied by 20 is 200 # Sum of 10 and 20 is 30
futures
Future 是表示异步计算结果的低级可等待对象。它是通过调用 asyncio.Future()
函数创建的。
from asyncio import Future future = Future() print(future.done()) print(future.cancelled()) future.cancel() print(future.done()) print(future.cancelled()) # False # False # True # True
超时
用于 asyncio.wait_for(aw, timeout, *)
设置等待对象完成的超时。请注意, aw
这里是可等待的对象。如果要在可等待对象完成时间过长时引发异常,这将很有用。作为异常 asyncio.TimeoutError
。
import asyncio async def slow_operation(): await asyncio.sleep(400) print("Completed.") async def main(): try: await asyncio.wait_for(slow_operation(), timeout=1.0) except asyncio.TimeoutError: print("Timed out!") asyncio.run(main()) # Timed out!
尽管协程需要 400 秒才能完成,但 slow_operation()
中的超时 Future 设置为 1 秒。