python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python asyncio完全指南

Python标准库asyncio用法完全指南

作者:transformer变压器

Python asyncio是构建高性能异步应用程序的终极工具,通过协程机制实现真正的并发编程,下面这篇文章主要介绍了Python标准库asyncio用法的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下

什么是 asyncio

asyncio 是 Python 3.4+ 引入的标准库,用于编写并发代码的异步 I/O 框架。它使用事件循环和协程实现单线程并发,特别适合处理 I/O 密集型任务。

核心优势:

核心概念

协程 (Coroutine)

使用 async def 定义的特殊函数,可以在执行过程中暂停和恢复。

async def my_coroutine():
    print("开始执行")
    await asyncio.sleep(1)
    print("执行完成")

事件循环 (Event Loop)

asyncio 的核心,负责调度和执行协程。

await 关键字

用于等待异步操作完成,只能在 async 函数内使用。

Task

对协程的封装,可以并发执行多个协程。

Future

表示一个异步操作的最终结果。

基础用法

1. 运行协程的三种方式

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 方式1: Python 3.7+ 推荐
asyncio.run(hello())

# 方式2: 手动管理事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()

# 方式3: 在已有事件循环中
# await hello()  # 只能在异步函数内使用

2. 创建和管理任务

async def task1():
    await asyncio.sleep(2)
    return "任务1完成"

async def task2():
    await asyncio.sleep(1)
    return "任务2完成"

async def main():
    # 创建任务
    t1 = asyncio.create_task(task1())
    t2 = asyncio.create_task(task2())
    
    # 等待所有任务完成
    results = await asyncio.gather(t1, t2)
    print(results)

asyncio.run(main())

3. 并发执行

async def fetch_data(n):
    print(f"开始获取数据 {n}")
    await asyncio.sleep(1)
    print(f"完成获取数据 {n}")
    return f"数据 {n}"

async def main():
    # gather: 并发执行,按顺序返回结果
    results = await asyncio.gather(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3)
    )
    print(results)

asyncio.run(main())

内部机理

事件循环的工作原理

┌─────────────────────────────────┐
│      事件循环 (Event Loop)       │
│                                 │
│  ┌──────────────────────────┐  │
│  │   就绪队列 (Ready Queue)  │  │
│  │   [task1, task2, task3]  │  │
│  └──────────────────────────┘  │
│                                 │
│  ┌──────────────────────────┐  │
│  │  等待队列 (Wait Queue)    │  │
│  │  [task4, task5]          │  │
│  └──────────────────────────┘  │
│                                 │
│  ┌──────────────────────────┐  │
│  │     I/O 选择器           │  │
│  │  (epoll/kqueue/select)   │  │
│  └──────────────────────────┘  │
└─────────────────────────────────┘

执行流程

  1. 初始化: 创建事件循环
  2. 注册协程: 将协程包装成 Task 对象
  3. 调度执行:
    • 从就绪队列取出 Task
    • 执行到 await 处暂停
    • 注册到相应的等待队列
  4. I/O 多路复用: 监听 I/O 事件
  5. 唤醒协程: I/O 完成后,将 Task 移回就绪队列
  6. 循环往复: 直到所有任务完成

协程状态转换

async def example():
    print("1. 开始执行")      # RUNNING
    await asyncio.sleep(1)    # WAITING (挂起)
    print("2. 继续执行")      # RUNNING (恢复)
    return "完成"             # FINISHED

底层实现关键点

# 简化的事件循环伪代码
class EventLoop:
    def __init__(self):
        self._ready = deque()  # 就绪队列
        self._selector = select.epoll()  # I/O 选择器
    
    def run_until_complete(self, coro):
        task = Task(coro)
        self._ready.append(task)
        
        while self._ready or self._has_pending_io():
            # 执行就绪的任务
            if self._ready:
                task = self._ready.popleft()
                task.step()
            
            # 等待 I/O 事件
            events = self._selector.select(timeout=0)
            for event in events:
                self._ready.append(event.callback)

与多线程/多进程的区别

对比表格

特性asyncio多线程 (threading)多进程 (multiprocessing)
并发模型协作式并发抢占式并发真正并行
运行环境单线程多线程多进程
GIL 影响无影响受 GIL 限制不受 GIL 限制
切换开销极小(用户态)较大(内核态)最大(进程切换)
内存占用中等
适用场景I/O 密集型I/O 密集型CPU 密集型
数据共享简单(同一线程)需要锁需要 IPC
调试难度容易困难中等

使用场景详解

asyncio 适合的场景

# 典型的 asyncio 场景: 并发网络请求
import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ['http://example.com'] * 100
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    print(f"完成 {len(results)} 个请求")

asyncio.run(main())

多线程适合的场景

import threading
import requests

def fetch_url(url):
    response = requests.get(url)
    print(f"完成: {url}")

urls = ['http://example.com'] * 10
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]

for t in threads:
    t.start()
for t in threads:
    t.join()

多进程适合的场景

from multiprocessing import Pool

def cpu_intensive(n):
    return sum(i*i for i in range(n))

with Pool(4) as pool:
    results = pool.map(cpu_intensive, [10000000] * 4)
    print(results)

性能对比示例

import asyncio
import threading
import multiprocessing
import time

# I/O 密集型任务
def io_bound_sync():
    time.sleep(1)

async def io_bound_async():
    await asyncio.sleep(1)

# 测试 asyncio
async def test_asyncio():
    start = time.time()
    await asyncio.gather(*[io_bound_async() for _ in range(100)])
    print(f"asyncio: {time.time() - start:.2f}s")  # 约 1 秒

# 测试多线程
def test_threading():
    start = time.time()
    threads = [threading.Thread(target=io_bound_sync) for _ in range(100)]
    for t in threads: t.start()
    for t in threads: t.join()
    print(f"threading: {time.time() - start:.2f}s")  # 约 1-2 秒

# asyncio 在 I/O 密集型任务中表现最优

高级特性

1. 异步上下文管理器

class AsyncResource:
    async def __aenter__(self):
        print("获取资源")
        await asyncio.sleep(1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("释放资源")
        await asyncio.sleep(1)

async def main():
    async with AsyncResource() as resource:
        print("使用资源")

asyncio.run(main())

2. 异步迭代器

class AsyncRange:
    def __init__(self, start, end):
        self.current = start
        self.end = end
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.current += 1
        return self.current - 1

async def main():
    async for i in AsyncRange(0, 5):
        print(i)

asyncio.run(main())

3. 异步生成器

async def async_generator():
    for i in range(5):
        await asyncio.sleep(1)
        yield i

async def main():
    async for value in async_generator():
        print(value)

asyncio.run(main())

4. 超时控制

async def slow_operation():
    await asyncio.sleep(5)
    return "完成"

async def main():
    try:
        # 设置 2 秒超时
        result = await asyncio.wait_for(slow_operation(), timeout=2)
    except asyncio.TimeoutError:
        print("操作超时")

asyncio.run(main())

5. 信号量和锁

# 限制并发数量
async def limited_task(sem, n):
    async with sem:
        print(f"任务 {n} 开始")
        await asyncio.sleep(1)
        print(f"任务 {n} 完成")

async def main():
    sem = asyncio.Semaphore(3)  # 最多 3 个并发
    await asyncio.gather(*[limited_task(sem, i) for i in range(10)])

asyncio.run(main())

6. 队列

async def producer(queue, n):
    for i in range(n):
        await queue.put(i)
        print(f"生产: {i}")
        await asyncio.sleep(0.5)

async def consumer(queue, name):
    while True:
        item = await queue.get()
        print(f"{name} 消费: {item}")
        await asyncio.sleep(1)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    
    # 创建生产者和消费者
    producers = [asyncio.create_task(producer(queue, 5))]
    consumers = [asyncio.create_task(consumer(queue, f"消费者{i}")) 
                 for i in range(2)]
    
    await asyncio.gather(*producers)
    await queue.join()  # 等待所有任务处理完成
    
    for c in consumers:
        c.cancel()

asyncio.run(main())

实战示例

示例1: 异步网络爬虫

import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def fetch_page(session, url):
    try:
        async with session.get(url, timeout=10) as response:
            return await response.text()
    except Exception as e:
        print(f"错误 {url}: {e}")
        return None

async def parse_page(html):
    if html:
        soup = BeautifulSoup(html, 'html.parser')
        return soup.title.string if soup.title else "无标题"
    return None

async def crawl_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_page(session, url) for url in urls]
        pages = await asyncio.gather(*tasks)
        
        titles = await asyncio.gather(*[parse_page(page) for page in pages])
        return titles

urls = [
    'http://example.com',
    'http://example.org',
    'http://example.net',
]

# 执行爬虫
# titles = asyncio.run(crawl_urls(urls))
# print(titles)

示例2: 异步数据库操作

import asyncio
import aiosqlite

async def create_table(db):
    await db.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT,
            email TEXT
        )
    ''')
    await db.commit()

async def insert_user(db, name, email):
    await db.execute(
        'INSERT INTO users (name, email) VALUES (?, ?)',
        (name, email)
    )
    await db.commit()

async def fetch_users(db):
    async with db.execute('SELECT * FROM users') as cursor:
        return await cursor.fetchall()

async def main():
    async with aiosqlite.connect('test.db') as db:
        await create_table(db)
        
        # 并发插入
        await asyncio.gather(
            insert_user(db, 'Alice', 'alice@example.com'),
            insert_user(db, 'Bob', 'bob@example.com'),
            insert_user(db, 'Charlie', 'charlie@example.com')
        )
        
        users = await fetch_users(db)
        for user in users:
            print(user)

# asyncio.run(main())

示例3: 异步 Web 服务器

import asyncio

async def handle_client(reader, writer):
    data = await reader.read(1024)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print(f"收到来自 {addr} 的消息: {message}")
    
    response = f"Echo: {message}"
    writer.write(response.encode())
    await writer.drain()
    
    writer.close()
    await writer.wait_closed()

async def start_server():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888
    )
    
    addr = server.sockets[0].getsockname()
    print(f"服务器启动在 {addr}")
    
    async with server:
        await server.serve_forever()

# asyncio.run(start_server())

示例4: 实时数据流处理

import asyncio
import random

async def data_stream():
    """模拟数据流"""
    while True:
        yield random.randint(1, 100)
        await asyncio.sleep(0.5)

async def process_data(value):
    """处理数据"""
    await asyncio.sleep(0.1)
    return value * 2

async def monitor_stream():
    """监控和处理数据流"""
    buffer = []
    
    async for data in data_stream():
        print(f"接收数据: {data}")
        
        # 异步处理数据
        task = asyncio.create_task(process_data(data))
        buffer.append(task)
        
        # 每 5 个数据批量处理
        if len(buffer) >= 5:
            results = await asyncio.gather(*buffer)
            print(f"处理结果: {results}")
            buffer = []
        
        # 演示用,处理 20 个数据后停止
        if data > 20:
            break

# asyncio.run(monitor_stream())

示例5: 多任务协调

import asyncio

async def task_with_priority(name, priority, duration):
    print(f"[优先级 {priority}] {name} 开始")
    await asyncio.sleep(duration)
    print(f"[优先级 {priority}] {name} 完成")
    return f"{name} 结果"

async def coordinator():
    # 创建不同优先级的任务
    high_priority = [
        task_with_priority(f"高优先级-{i}", 1, 1) 
        for i in range(3)
    ]
    
    low_priority = [
        task_with_priority(f"低优先级-{i}", 3, 2) 
        for i in range(3)
    ]
    
    # 先执行高优先级任务
    high_results = await asyncio.gather(*high_priority)
    print(f"高优先级完成: {high_results}")
    
    # 再执行低优先级任务
    low_results = await asyncio.gather(*low_priority)
    print(f"低优先级完成: {low_results}")

asyncio.run(coordinator())

最佳实践

1. 错误处理

async def safe_task(n):
    try:
        if n == 3:
            raise ValueError("错误的值")
        await asyncio.sleep(1)
        return f"任务 {n} 成功"
    except Exception as e:
        print(f"任务 {n} 失败: {e}")
        return None

async def main():
    results = await asyncio.gather(
        safe_task(1),
        safe_task(2),
        safe_task(3),
        return_exceptions=True  # 不会因为一个任务失败而停止
    )
    print(results)

asyncio.run(main())

2. 资源清理

class AsyncConnection:
    async def __aenter__(self):
        self.conn = await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()
    
    async def connect(self):
        print("建立连接")
        await asyncio.sleep(1)
        return "connection"
    
    async def close(self):
        print("关闭连接")
        await asyncio.sleep(1)

async def main():
    async with AsyncConnection() as conn:
        print(f"使用连接: {conn.conn}")

asyncio.run(main())

3. 避免阻塞

import asyncio
from concurrent.futures import ThreadPoolExecutor

# 错误: 阻塞操作
async def bad_example():
    time.sleep(5)  # 这会阻塞整个事件循环!

# 正确: 使用 executor 运行阻塞操作
async def good_example():
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        result = await loop.run_in_executor(executor, blocking_function)
    return result

def blocking_function():
    import time
    time.sleep(5)
    return "完成"

4. 合理设置超时

async def with_timeout():
    try:
        result = await asyncio.wait_for(
            long_running_task(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("任务超时,执行回退逻辑")
        result = "默认值"
    return result

5. 使用 TaskGroup (Python 3.11+)

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro())
        task2 = tg.create_task(another_coro())
    
    # 所有任务完成或某个任务失败时退出
    print("所有任务完成")

总结

asyncio 的核心优势

  1. 高性能: 单线程处理大量并发,避免线程切换开销
  2. 低资源消耗: 协程比线程更轻量,内存占用少
  3. 代码可读性: async/await 语法让异步代码像同步代码一样易读
  4. 丰富的生态: aiohttp、aiopg、aiomysql 等大量异步库支持

何时使用 asyncio

✅ 适合使用:

❌ 不适合使用:

学习路径建议

  1. 基础阶段: 掌握 async/await、事件循环、Task 基本概念
  2. 进阶阶段: 学习异步上下文管理器、生成器、同步原语
  3. 实战阶段: 使用 aiohttp、aiopg 等库构建实际项目
  4. 优化阶段: 性能分析、错误处理、最佳实践

常见陷阱

参考资源

结语: asyncio 是 Python 异步编程的强大工具,掌握它可以显著提升 I/O 密集型应用的性能。理解其内部机制和最佳实践,能帮助你写出高效、可维护的异步代码。

总结

到此这篇关于Python标准库asyncio用法完全指南的文章就介绍到这了,更多相关Python asyncio完全指南内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文