python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python loop.run_in_executor

Python中的loop.run_in_executor基本用法

作者:无风听海

文章介绍了在Python的异步编程中使用`loop.run_in_executor`来执行阻塞操作的方法,包括基本用法、线程池和进程池的选择、与协程的差异对比、进阶用法、注意事项以及真实场景举例,感兴趣的朋友跟随小编一起看看吧

✅ 一、背景:为什么需要loop.run_in_executor

在 Python 的异步编程(asyncio)中,协程可以并发运行,提高效率,但它们依赖于“非阻塞的 I/O”。如果你在协程中调用了一个阻塞的操作(比如 time.sleep()requests.get() 等),它会阻塞整个事件循环,导致其他协程也无法继续执行

为了解决这个问题,Python 提供了 loop.run_in_executor(),允许你把阻塞的同步代码“放在后台线程或进程中执行”,从而不影响主事件循环。

🧪 二、基本用法和执行流程

✅ 基本结构:

loop.run_in_executor(executor, func, *args)

✅ 最小示例(阻塞函数放到线程中):

import asyncio
import time
def blocking_func(name):
    print(f"开始阻塞任务 {name}")
    time.sleep(3)
    print(f"结束阻塞任务 {name}")
    return f"{name} done"
async def main():
    loop = asyncio.get_running_loop()
    # 把阻塞函数丢进默认线程池
    result = await loop.run_in_executor(None, blocking_func, "任务A")
    print(result)
asyncio.run(main())

⏳ 输出:

开始阻塞任务 任务A
结束阻塞任务 任务A
任务A done

注意:虽然 blocking_func() 是阻塞的,但不会阻塞 asyncio 主事件循环,因此你可以同时运行其他协程。

🧵 三、线程池 vs 进程池

1. ThreadPoolExecutor(默认)

2. ProcessPoolExecutor

示例(使用 ProcessPoolExecutor):

from concurrent.futures import ProcessPoolExecutor
def compute(n):
    return sum(i * i for i in range(n))
async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as executor:
        result = await loop.run_in_executor(executor, compute, 10_000_000)
        print(result)
asyncio.run(main())

📊 四、run_in_executor 与协程的差异对比

特性协程 (async def)run_in_executor
是否阻塞事件循环
适用于异步 I/O 操作同步阻塞操作
是否需要线程或进程是(线程或进程池)
是否自动并发
是否可中断可以使用 asyncio.CancelledError线程执行不能中断

🔁 五、与其他异步写法对比

❌ 错误做法(会阻塞整个事件循环):

import asyncio
import time
async def wrong():
    time.sleep(2)  # 阻塞整个事件循环!
    print("完成")
asyncio.run(wrong())

✅ 正确做法(使用run_in_executor):

async def correct():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, time.sleep, 2)
    print("完成")

🔐 六、进阶用法:并行多个任务

import asyncio
import time
def task(name, duration):
    print(f"开始 {name}")
    time.sleep(duration)
    print(f"结束 {name}")
    return name
async def main():
    loop = asyncio.get_running_loop()
    tasks = [
        loop.run_in_executor(None, task, 'A', 2),
        loop.run_in_executor(None, task, 'B', 3),
        loop.run_in_executor(None, task, 'C', 1),
    ]
    results = await asyncio.gather(*tasks)
    print("全部完成:", results)
asyncio.run(main())

⏳ 输出(并行):

开始 A
开始 B
开始 C
结束 C
结束 A
结束 B
全部完成: ['A', 'B', 'C']

⚠️ 七、注意事项与潜在陷阱

问题描述与解决
共享资源问题多线程操作同一个变量可能会出错,考虑加锁或使用 asyncio.Queue
线程池大小限制默认线程池大小有限(通常为 CPU 核心数的 5 倍),可手动调整
异常处理在线程中抛出的异常必须在主线程中 await 时捕获
进程池不能用 lambda进程池中的函数必须是可序列化的,不能是匿名函数或本地函数
不能中断线程任务一旦 run_in_executor 开始执行函数,无法强制中断线程任务

💼 八、真实场景举例

示例 1:读取大文件(I/O 密集型)

def read_file(path):
    with open(path, 'r') as f:
        return f.read()
data = await loop.run_in_executor(None, read_file, "bigfile.txt")

示例 2:调用同步网络库(如requests)

import requests
def fetch(url):
    response = requests.get(url)
    return response.text
html = await loop.run_in_executor(None, fetch, "https://example.com")

实际建议:使用 httpx.AsyncClient 替代 requests

🧩 九、封装通用工具函数(推荐写法)

import asyncio
from typing import Callable, Any
from functools import partial
async def to_thread(func: Callable, *args, **kwargs) -> Any:
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, partial(func, *args, **kwargs))
# 使用方式
result = await to_thread(my_blocking_function, arg1, arg2)

✅ 十、Python 3.9+ 新特性:asyncio.to_thread

从 Python 3.9 起,你可以直接用内置的 asyncio.to_thread() 来代替 run_in_executor(None, ...),更加简洁:

import asyncio
def blocking_func():
    ...
await asyncio.to_thread(blocking_func)

等价于:

await loop.run_in_executor(None, blocking_func)

📘 总结

特点内容
功能异步执行阻塞的同步函数
用法await loop.run_in_executor(executor, func, *args)
默认线程池executor=None
用于场景文件 I/O、网络请求、同步数据库操作、CPU 密集计算
替代方案Python 3.9+: asyncio.to_thread()

到此这篇关于Python中的loop.run_in_executor基本用法的文章就介绍到这了,更多相关Python loop.run_in_executor内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文