python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > python asyncio用法

由浅入深介绍python asyncio的各种用法与代码示例

作者:令狐掌门

asyncio 是 Python 的一个库,用于编写并发代码,使用协程、任务和 Futures 来处理 I/O 密集型和高延迟操作,下面小编就为大家由浅入深介绍python asyncio的各种用法吧

下面是一份“由浅入深”的 asyncio 实战手册。先解释核心概念,再给出可直接运行的、循序渐进的示例代码。全部示例都只依赖标准库(除少数标注处),适配 Python 3.10+(如使用 TaskGroupasyncio.timeout() 的段落需要 3.11+)。

1. 基础概念速览

2. 最小可运行示例:async/await+asyncio.run

# demo_basic.py
import asyncio

async def io_job(name, delay):
    print(f"[{name}] start")
    await asyncio.sleep(delay)  # 模拟I/O等待
    print(f"[{name}] done after {delay}s")
    return name, delay

async def main():
    r1 = await io_job("A", 1)
    r2 = await io_job("B", 2)  # 串行等待,总耗时约3秒
    print("results:", r1, r2)

if __name__ == "__main__":
    asyncio.run(main())

3. 并发执行:create_task、gather、as_completed

# demo_concurrency.py
import asyncio, random

async def fetch(i):
    d = random.uniform(0.5, 2.0)
    await asyncio.sleep(d)
    return f"task-{i}", round(d, 2)

async def main():
    # 3.1 create_task + await
    t1 = asyncio.create_task(fetch(1))
    t2 = asyncio.create_task(fetch(2))
    r1 = await t1
    r2 = await t2
    print("create_task results:", r1, r2)

    # 3.2 gather(顺序返回结果;遇异常默认会立刻抛出)
    results = await asyncio.gather(*(fetch(i) for i in range(3, 8)))
    print("gather results:", results)

    # 3.3 as_completed(谁先完成先拿谁)
    tasks = [asyncio.create_task(fetch(i)) for i in range(8, 13)]
    for fut in asyncio.as_completed(tasks):
        print("as_completed:", await fut)

if __name__ == "__main__":
    asyncio.run(main())

4. 超时控制与取消:wait_for、asyncio.timeout、Task.cancel

# demo_timeout_cancel.py
import asyncio

async def slow():
    try:
        await asyncio.sleep(5)
        return "ok"
    except asyncio.CancelledError:
        print("slow() got cancelled!")
        raise

async def main():
    # 4.1 wait_for(3.8+)
    try:
        res = await asyncio.wait_for(slow(), timeout=2)
        print("result:", res)
    except asyncio.TimeoutError:
        print("wait_for timeout!")

    # 4.2 asyncio.timeout 上下文(3.11+)
    try:
        async with asyncio.timeout(2):  # 注:需 Python 3.11+
            await slow()
    except TimeoutError:
        print("context timeout!")

    # 4.3 手动取消
    t = asyncio.create_task(slow())
    await asyncio.sleep(1)
    t.cancel()
    try:
        await t
    except asyncio.CancelledError:
        print("task cancelled confirmed")

if __name__ == "__main__":
    asyncio.run(main())

小贴士:被取消的任务应正确处理 CancelledError,并在需要时做清理(finally)。

5. 限流与同步原语:Semaphore、Lock、Event、Condition

# demo_sync_primitives.py
import asyncio, random

sem = asyncio.Semaphore(3)  # 同时最多3个并发
lock = asyncio.Lock()
evt = asyncio.Event()

async def worker(i):
    async with sem:  # 限制并发
        await asyncio.sleep(random.uniform(0.2, 1.0))
        async with lock:  # 保护共享输出(示意)
            print(f"worker {i} done")

async def notifier():
    await asyncio.sleep(1)
    evt.set()  # 广播事件

async def waiter():
    print("waiting for event...")
    await evt.wait()
    print("event received!")

async def main():
    tasks = [asyncio.create_task(worker(i)) for i in range(10)]
    tasks += [asyncio.create_task(notifier()), asyncio.create_task(waiter())]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

Condition 适合更复杂的“等待某条件成立”的场景,用法与 threading.Condition 类似(只是换成 async with / await)。

6. 生产者-消费者:asyncio.Queue

# demo_queue.py
import asyncio, random

async def producer(q: asyncio.Queue):
    for i in range(10):
        await asyncio.sleep(random.uniform(0.1, 0.4))
        await q.put((i, f"data-{i}"))
        print(f"produced {i}")
    await q.put(None)  # 结束哨兵

async def consumer(q: asyncio.Queue):
    while True:
        item = await q.get()
        if item is None:
            q.task_done()
            break
        i, data = item
        await asyncio.sleep(0.3)
        print(f"consumed {i} -> {data}")
        q.task_done()

async def main():
    q = asyncio.Queue(maxsize=5)
    prod = asyncio.create_task(producer(q))
    cons = asyncio.create_task(consumer(q))
    await asyncio.gather(prod)
    await q.join()        # 等全部消费完成
    await cons            # 等消费者退出

if __name__ == "__main__":
    asyncio.run(main())

7. 任务编组(Python 3.11+):asyncio.TaskGroup

# demo_taskgroup.py
import asyncio, random

async def job(n):
    await asyncio.sleep(random.uniform(0.2, 1.0))
    if n == 3:
        raise RuntimeError("boom at 3")
    return n

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(job(i)) for i in range(5)]
            # 出错会自动取消其余任务并向外传播异常
    except* RuntimeError as eg:  # PEP 654(ExceptionGroup)
        print("caught:", eg)

if __name__ == "__main__":
    asyncio.run(main())

对比 gatherTaskGroup 在结构化并发上更可靠,失败会自动收拢与传播。

8. 与线程/同步代码协作:to_thread、run_in_executor

# demo_thread_bridge.py
import asyncio, time, concurrent.futures

def blocking_io(n):
    time.sleep(n)
    return f"blocking {n}s"

async def main():
    # 8.1 Python 3.9+ 推荐:to_thread
    r1 = await asyncio.to_thread(blocking_io, 1)
    print("to_thread:", r1)

    # 8.2 传统:run_in_executor
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:
        futs = [loop.run_in_executor(pool, blocking_io, i) for i in (1, 2, 1)]
        for r in await asyncio.gather(*futs):
            print("executor:", r)

if __name__ == "__main__":
    asyncio.run(main())

原则:CPU 密集就放线程/进程池;I/O 密集await 原生异步接口。

9. TCP/UDP 网络编程(内置 Streams / Protocols)

9.1 TCP Echo(Server & Client,基于 Streams)

# demo_tcp_echo.py
import asyncio

async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    addr = writer.get_extra_info('peername')
    print(f"client connected: {addr}")
    try:
        while data := await reader.readline():
            msg = data.decode().rstrip()
            print(f"recv: {msg}")
            writer.write((msg + "\n").encode())
            await writer.drain()
    except asyncio.CancelledError:
        raise
    finally:
        writer.close()
        await writer.wait_closed()
        print("client closed", addr)

async def run_server():
    server = await asyncio.start_server(handle_echo, "127.0.0.1", 8888)
    addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
    print(f"Serving on {addrs}")
    async with server:
        await server.serve_forever()

async def run_client():
    reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
    for i in range(3):
        writer.write(f"hello {i}\n".encode())
        await writer.drain()
        echo = await reader.readline()
        print("echo:", echo.decode().rstrip())
    writer.close()
    await writer.wait_closed()

async def main():
    server_task = asyncio.create_task(run_server())
    await asyncio.sleep(0.2)
    await run_client()
    server_task.cancel()
    with contextlib.suppress(asyncio.CancelledError):
        await server_task

if __name__ == "__main__":
    import contextlib
    asyncio.run(main())

9.2 UDP(Datagram)

# demo_udp.py
import asyncio

class EchoServer(asyncio.DatagramProtocol):
    def datagram_received(self, data, addr):
        print("server recv:", data, "from", addr)
        self.transport.sendto(data, addr)

async def main():
    loop = asyncio.get_running_loop()
    transport, _ = await loop.create_datagram_endpoint(
        lambda: EchoServer(), local_addr=("127.0.0.1", 9999)
    )
    # client
    on_resp = loop.create_future()
    class Client(asyncio.DatagramProtocol):
        def datagram_received(self, data, addr):
            print("client recv:", data)
            on_resp.set_result(None)
    ctransport, _ = await loop.create_datagram_endpoint(
        lambda: Client(), remote_addr=("127.0.0.1", 9999)
    )
    ctransport.sendto(b"hello-udp")
    await on_resp
    transport.close()
    ctransport.close()

if __name__ == "__main__":
    asyncio.run(main())

10. 子进程(异步等待):asyncio.create_subprocess_exec

# demo_subprocess.py
import asyncio, sys

async def main():
    # 跨平台示例:调用 python -c 'print("hi")'
    proc = await asyncio.create_subprocess_exec(
        sys.executable, "-c", 'print("hi from child")',
        stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )
    out, err = await proc.communicate()
    print("stdout:", out.decode().strip(), "| code:", proc.returncode)

if __name__ == "__main__":
    asyncio.run(main())

11. 异步上下文管理器与迭代器:async with、async for

# demo_async_with_for.py
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def open_resource():
    print("acquire resource")
    await asyncio.sleep(0.2)
    try:
        yield "RESOURCE"
    finally:
        await asyncio.sleep(0.2)
        print("release resource")

class AsyncCounter:
    def __init__(self, n): self.n=n; self.i=0
    def __aiter__(self): return self
    async def __anext__(self):
        if self.i >= self.n:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.i += 1
        return self.i

async def main():
    async with open_resource() as r:
        print("using:", r)
    async for x in AsyncCounter(5):
        print("got:", x)

if __name__ == "__main__":
    asyncio.run(main())

12. 超实用模式集

12.1 背压与批处理

# demo_backpressure.py
import asyncio, random

async def producer(q):
    for i in range(30):
        await q.put(i)             # maxsize 限制可形成背压
        await asyncio.sleep(0.05)
    await q.put(None)

async def consumer(q):
    batch = []
    while True:
        item = await q.get()
        if item is None:
            if batch:
                print("flush batch:", batch)
            q.task_done()
            break
        batch.append(item)
        if len(batch) >= 8:
            # 模拟批量处理
            await asyncio.sleep(random.uniform(0.1, 0.3))
            print("process batch:", batch)
            batch.clear()
        q.task_done()

async def main():
    q = asyncio.Queue(maxsize=10)
    await asyncio.gather(producer(q), consumer(q))
    await q.join()

if __name__ == "__main__":
    asyncio.run(main())

12.2 幂等重试 + 指数退避

# demo_retry.py
import asyncio, random

async def fragile_call():
    await asyncio.sleep(0.1)
    if random.random() < 0.7:
        raise RuntimeError("transient")
    return "ok"

async def retry(coro_func, attempts=5, base=0.2):
    for n in range(attempts):
        try:
            return await coro_func()
        except Exception as e:
            if n == attempts - 1:
                raise
            await asyncio.sleep(base * (2 ** n))  # 退避
    raise RuntimeError("unreachable")

async def main():
    try:
        r = await retry(fragile_call)
        print("result:", r)
    except Exception as e:
        print("failed:", e)

if __name__ == "__main__":
    asyncio.run(main())

13. 常见坑与最佳实践

入口统一用 asyncio.run(main()),不要混用早期 API(如手动获取 loop、run_until_complete),除非有特殊需求。

避免阻塞调用(如 time.sleep()、重 CPU 任务)直接出现在协程里;改用 await asyncio.sleep()asyncio.to_thread()/进程池。

正确处理取消:在 try/except/finally 中传播 CancelledError,避免吞掉取消导致任务“僵尸化”。

使用限流:对外部服务/磁盘/网络做 Semaphore、队列背压,保护系统。

结构化并发:优先考虑 TaskGroup(3.11+)来让“任务生命周期”明确,异常聚合安全。

日志与超时:给关键 I/O 加 timeout,并使用 asyncio.create_task() 后保存句柄,便于监控和取消。

14. 进阶延伸(可选)

文件异步:标准库没有真正异步文件 I/O;可选三方库 aiofiles(仅示意):

# pip install aiofiles
import aiofiles, asyncio
async def read_file(p):
    async with aiofiles.open(p, "r", encoding="utf-8") as f:
        return await f.read()

HTTP 客户端/服务端:aiohttphttpx[http2] 等第三方库更贴近真实网络场景。

与 GUI/Qt 的集成:可通过 qasync 把 Qt 事件循环与 asyncio 融合(适合你在 QML/PySide6 下需要异步网络/IO 的情况)。

到此这篇关于由浅入深介绍python asyncio的各种用法与代码示例的文章就介绍到这了,更多相关python asyncio用法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文