python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python Async IO异步编程

Async IO在Python中的异步编程工作实例解析

作者:简讯Alfred

这篇文章主要为大家介绍了Async IO在Python中的异步编程工作实例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

许多程序员都熟悉编写顺序(同步)代码,在异步世界中,事件的发生独立于主程序流程。这意味着动作在后台执行,无需等待上一个动作完成。

换句话说,代码行是并发执行的。

想象一下,你有一些独立的任务,每个任务都需要大量的运行时间才能完成。他们的输出不相互依赖。所以,您想一次启动它们。如果这些任务按特定顺序执行,程序将不得不等待每个任务完成后再开始下一个任务。等待时间会阻塞程序。

异步编程范例有助于并发执行这些任务,并确保您可以克服等待时间并更有效地使用资源。

在 Python 中引入异步

Python 中引入了两个主要组件:

例如要进行 HTTP 调用,请考虑使用 aiohttp ,这是一个 Python 包,允许异步进行 HTTP 调用。在异步代码中,常用的 requests 库可能效果不是很好。

同样,如果您正在使用 Mongo 数据库,而不是依赖同步驱动程序(如 mongo-python ),您必须使用异步驱动程序(如 moto 异步访问 MongoDB)。

在异步世界中,一切都在事件循环中运行。这允许您一次运行多个协程。我们将在本教程中了解协程是什么。

里面的一切 async def 都是异步代码,其他一切都是同步的。

编写异步代码不像编写同步代码那么容易。Python 异步模型基于事件、回调、传输、协议和期货等概念。

异步如何工作

asyncio 库提供了两个关键字, async 和 await .

让我们看一下这个 async hello-world 示例:

import asyncio
async def hello():
    print("Hello World!")
    await asyncio.sleep(1)
    print("Hello again!")
asyncio.run(hello())
# Hello World!
# Hello again!

乍一看你可能认为这是一个同步代码,因为第二次打印等待 1 秒打印Hello again! 在Hello World!之后。但是这段代码实际上是异步的。

协程

任何定义为 async def 的函数都是像上面那样的协程。需要注意的是,调用 hello() 函数需要在 asyncio.run() 中执行,

为了运行协程,asyncio 提供了三种主要机制:

asyncio.run() 函数,它是启动事件循环并运行异步的主要入口点。

使用 await 协程的结果并将控制权传递给事件循环。

import asyncio
import time
async def say_something(delay, words):
    print(f"Before {words}")
    await asyncio.sleep(delay)
    print(f"After {words}")
async def main():
    print(f"Started: {time.strftime('%X')}") 
    await say_something(1, "Task 1")
    await say_something(2, "Task 2")
    print(f"Finished: {time.strftime( '%X' )}")
asyncio.run(main())
# Started:15:59:52
# Before Task 1
# After Task 1
# Before Task 2
# After Task 2
# Finished:15:59:55

前面的代码片段仍然等待 say_something() 协程完成,因此它在 1 秒内执行第一个任务,然后在等待 2 秒后执行第二个任务。

要让协程并发运行,我们应该创建任务,这是第三种机制。

asyncio.create_task() 用于安排协程执行的函数。

import asyncio
import time
async def say_something(delay, words):
    print(f"Before {words}")
    await asyncio.sleep(delay)
    print(f"After {words}")
async def main():
    print(f"Started: {time.strftime('%X')}")
    task1 = asyncio.create_task(say_something(1,"Task 1")) 
    task2 = asyncio.create_task(say_something(2, "Task 2")) 
    await task1
    await task2
    print(f"Finished: {time.strftime('%X')}")
asyncio.run(main())
# Started:16:07:35
# Before Task 1
# Before Task 2
# After Task 1
# After Task 2
# Finished:16:07:37

上面的代码现在并发运行,say_something() 协程不再等待 say_something() 协程完成。而是同时运行具有不同参数的同一个协程。

发生的情况如下:

say_something() 协程从参数的第一个任务(1 秒和一个字符串Task 1)开始。这个任务叫做 task1

然后它会暂停协程的执行并等待 1 秒让 say_something() 协程在遇到 await 关键字时完成。它将控制权返回给事件循环。

同样对于第二个任务,它会暂停协程的执行并等待 2 秒让 say_something() 协程完成,因为它遇到 await 关键字。

task1 控制返回到事件循环后,事件循环继续执行第二个任务 task2 因为 asyncio.sleep() 还没有完成。

asyncio.create_task() 包装 say_something() 函数并使其作为异步任务同时运行协程。 如您所见,上面的代码片段显示它的运行速度比以前快了 1 秒。

当 asyncio.create_task() 被调用时,协程会自动安排在事件循环中运行。

任务可以帮助您并发运行多个协程,但这并不是实现并发的唯一方法。

使用 asyncio.gather() 运行并发任务

另一种同时运行多个协程的方法是使用 asyncio.gather() 函数。此函数将协程作为参数并并发运行它们。

import asyncio
import time
async def greetings():
    print("Welcome")
    await asyncio.sleep(1)
    print("Goodbye")
async def main():
    start = time.time()
    await asyncio.gather(greetings(), greetings())
    elapsed = time.time() - start
    print(f"{__name__} executed in {elapsed:0.2f} seconds.")
asyncio.run(main())
# Welcome
# Welcome
# Goodbye
# Goodbye
# __main__ executed in 1.00 seconds.

在前面的代码中,greetings() 协程被并发执行了两次。

等待对象

如果一个对象可以与 await 关键字一起使用,则该对象称为可等待对象。可等待对象主要有 3 种类型:coroutinestasksfutures

coroutines

import asyncio
async def mult(first, second):
    print("Calculating multiplication...")
    await asyncio.sleep(1)
    mul = first * second
    print(f"{first} multiplied by {second} is {mul}")
    return mul
async def add(first, second):
    print("Calculating sum...")
    await asyncio.sleep(1)
    sum = first + second
    print(f"Sum of {first} and {second} is {sum}")
    return sum
async def main(first, second):
    await mult(first, second)
    await add(first, second)
asyncio.run(main(10, 20))
# Calculating multiplication...
# 10 multiplied by 20 is 200
# Calculating sum...
# Sum of 10 and 20 is 30

在前面的示例中, 协同程序 main() 等待 mult() 和 add() 结束。

假设您在 mult 协程之前省略了 await 关键字。然后您将收到以下错误: RuntimeWarning: coroutine 'mult' was never awaited.

tasks

要安排协程在事件循环中运行,我们使用 asyncio.create_task() 函数。

import asyncio
async def mult(first, second):
    print("Calculating multiplication...")
    await asyncio.sleep(1)
    mul = first * second
    print(f"{first} multiplied by {second} is {mul}")
    return mul
async def add(first, second):
    print("Calculating sum...")
    await asyncio.sleep(1)
    sum = first + second
    print(f"Sum of {first} and {second} is {sum}")
    return sum
async def main(first, second):
    mult_task = asyncio.create_task(mult(first, second))
    add_task = asyncio.create_task(add(first, second))
    await mult_task
    await add_task
asyncio.run(main(10, 20))
# Calculating multiplication...
# Calculating sum...
# 10 multiplied by 20 is 200
# Sum of 10 and 20 is 30

futures

Future 是表示异步计算结果的低级可等待对象。它是通过调用 asyncio.Future() 函数创建的。

from asyncio import Future
future = Future()
print(future.done())
print(future.cancelled())
future.cancel()
print(future.done())
print(future.cancelled())
# False
# False
# True
# True

超时

用于 asyncio.wait_for(aw, timeout, *) 设置等待对象完成的超时。请注意, aw 这里是可等待的对象。如果要在可等待对象完成时间过长时引发异常,这将很有用。作为异常 asyncio.TimeoutError

import asyncio
async def slow_operation():
    await asyncio.sleep(400)
    print("Completed.")
async def main():
    try:
        await asyncio.wait_for(slow_operation(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Timed out!")
asyncio.run(main())
# Timed out!

尽管协程需要 400 秒才能完成,但 slow_operation() 中的超时 Future 设置为 1 秒。

您可能感兴趣的文章:
阅读全文