python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python asyncio异步编程

Python中asyncio库实现异步编程的示例

作者:彬彬侠

本文主要介绍了Python中asyncio库实现异步编程的示例

Python 的 asyncio 库是标准库的一部分,引入于 Python 3.4(通过 PEP 3156),用于实现异步编程。它提供了一种基于事件循环(Event Loop)的并发机制,适合处理 I/O 密集型任务(如网络请求、文件操作、数据库查询等)。asyncio 通过协程(coroutines)、任务(tasks)和事件循环,允许程序在等待 I/O 操作时执行其他任务,从而提高效率。

以下是对 asyncio 库的详细说明和常见用法。

1. asyncio 库的作用

2. 核心概念

3. 基本用法

以下是 asyncio 的基本用法,展示如何定义和运行协程。

3.1 定义和运行协程

import asyncio

# 定义协程
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟异步 I/O 操作
    print("World")

# 运行协程
async def main():
    await asyncio.gather(say_hello(), say_hello())  # 并发运行多个协程

# 执行事件循环
if __name__ == "__main__":
    asyncio.run(main())

输出:

Hello
Hello
World
World

说明:

3.2 事件循环

事件循环是 asyncio 的核心,负责调度协程和处理回调。以下是手动操作事件循环的示例:

import asyncio

async def task():
    print("Task started")
    await asyncio.sleep(1)
    print("Task finished")

loop = asyncio.get_event_loop()  # 获取事件循环
try:
    loop.run_until_complete(task())  # 运行协程直到完成
finally:
    loop.close()  # 关闭事件循环

说明:

4. 常用功能

asyncio 提供了丰富的 API,以下是常见功能和用法。

4.1 并发运行多个协程

使用 asyncio.gather() 或 asyncio.create_task() 实现并发:

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    # 使用 gather 并发运行
    await asyncio.gather(task1(), task2())
    # 或者使用 create_task
    t1 = asyncio.create_task(task1())
    t2 = asyncio.create_task(task2())
    await t1
    await t2

asyncio.run(main())

输出:

Task 1 started
Task 2 started
Task 2 finished
Task 1 finished

说明:

4.2 超时控制

使用 asyncio.wait_for() 为协程设置超时:

import asyncio

async def long_task():
    await asyncio.sleep(5)
    print("Task completed")

async def main():
    try:
        await asyncio.wait_for(long_task(), timeout=2)  # 2 秒超时
    except asyncio.TimeoutError:
        print("Task timed out")

asyncio.run(main())

输出:

Task timed out

4.3 异步迭代

asyncio 支持异步迭代器和异步上下文管理器:

import asyncio

async def async_generator():
    for i in range(3):
        await asyncio.sleep(1)
        yield i

async def main():
    async for value in async_generator():
        print(f"Received: {value}")

asyncio.run(main())

输出:

Received: 0
Received: 1
Received: 2

4.4 异步上下文管理器

使用 async with 管理资源:

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def resource():
    print("Resource acquired")
    try:
        yield
    finally:
        print("Resource released")

async def main():
    async with resource():
        print("Using resource")
        await asyncio.sleep(1)

asyncio.run(main())

输出:

Resource acquired
Using resource
Resource released

5. 与外部库集成

asyncio 常与异步库结合使用,例如:

aiohttp:异步 HTTP 客户端/服务器。

import aiohttp
import asyncio

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    html = await fetch_url("https://example.com")
    print(html[:100])

asyncio.run(main())

6. 高级功能

6.1 任务取消

可以取消正在运行的任务:

import asyncio

async def long_task():
    try:
        print("Task started")
        await asyncio.sleep(10)
        print("Task finished")
    except asyncio.CancelledError:
        print("Task was cancelled")
        raise

async def main():
    task = asyncio.create_task(long_task())
    await asyncio.sleep(1)
    task.cancel()  # 取消任务
    try:
        await task
    except asyncio.CancelledError:
        print("Main caught cancellation")

asyncio.run(main())

输出:

Task started
Task was cancelled
Main caught cancellation

6.2 同步与异步混合

使用 loop.run_in_executor() 将同步代码(阻塞操作)运行在线程池或进程池中:

import asyncio
import time

def blocking_task():
    time.sleep(1)  # 模拟阻塞操作
    return "Done"

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_task)  # 在默认线程池运行
    print(result)

asyncio.run(main())

输出:

Done

6.3 自定义事件循环

可以自定义事件循环策略,例如使用 uvloop(高性能事件循环):

import asyncio
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 后续代码使用 uvloop 作为事件循环

安装 uvloop:

pip install uvloop

7. 实际应用场景

8. 注意事项

9. 综合示例

以下是一个综合示例,展示异步爬虫的实现:

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://example.com",
        "https://python.org",
        "https://github.com"
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                print(f"Failed to fetch {url}: {result}")
            else:
                print(f"Fetched {url}: {len(result)} bytes")

if __name__ == "__main__":
    asyncio.run(main())

输出示例:

Fetched https://example.com: 1256 bytes
Fetched https://python.org: 50342 bytes
Fetched https://github.com: 123456 bytes

说明:

到此这篇关于Python中asyncio库实现异步编程的示例的文章就介绍到这了,更多相关Python asyncio异步编程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文