python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python在同步方法中执行协程方法

Python在同步方法中执行协程方法的实现

作者:无风听海

Python同步方法无法直接执行协程,因其返回协程对象需事件循环调度,五种解决方案涵盖不同场景:asyncio.run()适用于脚本,loop.run()需显式控制,nest_asyncio解决嵌套问题但不推荐生产,线程安全方法用于框架内任务,线程池运行协程为通用推荐方案

一、问题描述:同步方法无法直接执行协程

在 Python 中,协程(async def 函数)返回的是一个 协程对象,它不会自动执行,必须被事件循环调度执行。

同步方法中既不能使用 await,也没有运行事件循环,因此直接调用协程会导致如下问题:

async def my_coroutine():
    return 42

def sync_func():
    result = my_coroutine()  # ❌ 这里只是得到 coroutine 对象
    print(result)

# 输出类似:<coroutine object my_coroutine at 0x...>

更严重的是:

如果你尝试手动运行协程,但事件循环已经在运行(如 Jupyter Notebook、某些异步框架如 FastAPI),会抛出异常:

RuntimeError: This event loop is already running

二、解决方案总览

方案编号方法适用环境是否阻塞主线程可嵌套性
1asyncio.run()脚本 / CLI 程序主函数✅ 阻塞❌ 不能嵌套
2loop.run_until_complete()事件循环未运行的场景✅ 阻塞❌ 不能嵌套
3nest_asyncioJupyter / 教学用途✅ 阻塞✅ 可以嵌套
4asyncio.run_coroutine_threadsafe()异步框架 / 多线程后台任务✅ 阻塞✅ 安全
5线程池中运行事件循环通用同步环境✅ 阻塞✅ 安全

三、详细方案分析

方案一:使用asyncio.run()

示例

import asyncio

async def async_func():
    await asyncio.sleep(1)
    return "result"

def sync_func():
    result = asyncio.run(async_func())
    print(result)

场景适用

注意

RuntimeError: asyncio.run() cannot be called from a running event loop

方案二:使用loop.run_until_complete()

示例

import asyncio

async def async_func():
    await asyncio.sleep(1)
    return "result"

def sync_func():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(async_func())
    print(result)

场景适用

注意

RuntimeError: This event loop is already running

方案三:使用nest_asyncio允许事件循环嵌套

示例

pip install nest_asyncio
import asyncio
import nest_asyncio

nest_asyncio.apply()

async def async_func():
    await asyncio.sleep(1)
    return "nested result"

def sync_func():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(async_func())
    print(result)

场景适用

注意

方案四:使用asyncio.run_coroutine_threadsafe()(线程安全)

示例

import asyncio
import threading

async def async_func():
    await asyncio.sleep(1)
    return "from thread-safe"

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def sync_func():
    loop = asyncio.new_event_loop()
    threading.Thread(target=start_loop, args=(loop,), daemon=True).start()

    future = asyncio.run_coroutine_threadsafe(async_func(), loop)
    result = future.result()  # 阻塞直到完成
    print(result)

场景适用

注意

方案五:在线程池中运行事件循环(推荐通用方法)

示例

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def async_func():
    await asyncio.sleep(1)
    return "from thread pool"

def run_coroutine_in_thread(coro):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    return loop.run_until_complete(coro)

def sync_func():
    with ThreadPoolExecutor() as executor:
        future = executor.submit(run_coroutine_in_thread, async_func())
        result = future.result()
        print(result)

sync_func()

场景适用

优点

四、总结对比表(按推荐优先级)

方法是否可嵌套是否阻塞推荐使用场景
asyncio.run()✅ 是普通脚本、CLI 程序
loop.run_until_complete()✅ 是明确控制事件循环、服务初始化
nest_asyncio✅ 是Jupyter、教学(⚠️ 不用于生产)
run_coroutine_threadsafe()✅ 是后台线程任务调度,框架内部
线程池运行协程✅ 是✅ 通用方法,兼容所有同步环境

最佳实践建议

def run_async(coro):
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(coro)
    else:
        # 如果事件循环正在运行,使用线程池运行
        from concurrent.futures import ThreadPoolExecutor
        with ThreadPoolExecutor() as pool:
            return pool.submit(lambda: asyncio.run(coro)).result()

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文