python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python asyncio.Queue内存泄漏问题解决

详解Python中asyncio.Queue长连接服务的内存泄漏问题解决方法

作者:十有八七

这篇文章主要为大家详细介绍了Python中asyncio.Queue长连接服务的内存泄漏问题解决方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下

问题背景

在基于 asyncio 的长连接服务(如 SSE 推送、WebSocket、实时仪表盘)中,事件总线是最常见的基础设施之一。核心模式很简单:

# 订阅者注册一个 asyncio.Queue
queue = event_bus.subscribe("task:progress")

# 发布者往所有匹配的 Queue 里 put_nowait
event_bus.emit(Event("task:progress", data={"percent": 80}))

但生产环境中,订阅者的生命周期往往不可控——前端断连、浏览器关闭、SSE 连接超时、客户端忘记调用 unsubscribe……这些场景都会导致一个结果:

Queue 永远不会被消费,也永远不会被移除。

每次新连接创建一个 Queue,断连后 Queue 就成了"孤儿"——仍然挂在订阅列表里,emit() 时仍会往里面塞数据。日积月累,内存持续增长,最终 OOM。

这就是 长连接服务的隐形内存泄漏:不是你的代码写错了,而是你没有为"不正常的离开"兜底

错误方案与它们的缺陷

方案 A:依赖客户端主动 unsubscribe

# SSE 端点
async def sse_endpoint(request):
    queue = event_bus.subscribe("task:progress")
    try:
        while True:
            event = await queue.get()
            yield sse_format(event)
    finally:
        event_bus.unsubscribe(queue)

问题finally 只在协程正常退出时执行。如果客户端直接断开 TCP 连接,服务端的 queue.get() 可能还在 await 中,协程不会立刻被取消。即使框架做了清理,也存在时间窗口——连接断开到协程被取消之间的几百毫秒内,emit() 已经往死 Queue 里塞了若干事件。

更关键的是:你无法假设所有调用方都记得 unsubscribe。EventBus 是基础设施代码,它必须对自己的健康负责。

方案 B:给 Queue 设 maxlen

queue = asyncio.Queue(maxsize=100)

问题maxsize 限制的是 put 的阻塞,不是内存。用 put_nowait 时,超出会直接抛 QueueFull 异常——你要么吞掉异常(丢事件),要么让 emit 失败(影响所有订阅者)。而 maxlen=0(无限)是默认行为,改不了。

collections.dequemaxlen 可以自动丢弃旧数据,但 asyncio.Queue 没有。

方案 C:弱引用 WeakRef

self._subscribers.append(weakref.ref(queue))

问题asyncio.Queue 一旦没有强引用就会立刻被 GC 回收——但 Queue 本身就是被 EventBus 持有的,你恰恰需要 EventBus 来引用它。弱引用在这里逻辑上自相矛盾。

正确方案:TrackedQueue + 后台清理协程

核心思路:让 Queue 自己告诉系统"我还活着",系统定期检查谁已经沉默太久

第一步:包装 Queue,记录最后活跃时间

class _TrackedQueue(asyncio.Queue[Event]):
    """带追踪信息的 Queue,记录最后消费时间和订阅参数"""

    __slots__ = ("last_access", "_sub_event_type", "_sub_task_id")

    def __init__(
        self,
        event_type: str | None = None,
        task_id: uuid.UUID | None = None,
    ) -> None:
        super().__init__()
        self.last_access: float = time.monotonic()
        self._sub_event_type = event_type
        self._sub_task_id = task_id

    def touch(self) -> None:
        self.last_access = time.monotonic()

关键设计

设计点说明
继承 asyncio.Queue对消费方完全透明,await queue.get() 无需任何改动
__slots__避免每个实例创建 __dict__,大量 Queue 场景下节省内存
last_accessmonotonic不受系统时间回调影响,比 time.time() 更安全
保存订阅参数清理时需要知道"这个 Queue 挂在哪个列表里",否则要从所有列表里线性搜索

第二步:全局注册表 + 统一清理入口

class EventBus:
    def __init__(self) -> None:
        self._subscribers: dict[str, list[_TrackedQueue]] = defaultdict(list)
        self._global_subscribers: list[_TrackedQueue] = []
        self._task_subscribers: dict[uuid.UUID, list[_TrackedQueue]] = defaultdict(list)
        self._all_queues: set[_TrackedQueue] = set()  # ← 全局注册表
        self._cleanup_task: asyncio.Task | None = None

    def _remove_queue(self, q: _TrackedQueue) -> None:
        """从所有订阅列表中移除队列"""
        self._all_queues.discard(q)
        if q._sub_task_id is not None:
            subs = self._task_subscribers.get(q._sub_task_id, [])
            if q in subs:
                subs.remove(q)
                if not subs:
                    del self._task_subscribers[q._sub_task_id]
        elif q._sub_event_type is None:
            if q in self._global_subscribers:
                self._global_subscribers.remove(q)
        else:
            subs = self._subscribers.get(q._sub_event_type, [])
            if q in subs:
                subs.remove(q)

为什么需要 _all_queues

订阅者按 event_type / task_id / 全局分三组存储。如果没有全局注册表,清理时需要遍历所有分组查找——O(n) 且代码丑陋。_all_queues 是一个 set,查找 O(1),遍历一遍就能找出所有闲置 Queue。

_remove_queue 根据 Queue 自身保存的订阅参数,精准定位它所在的分组列表,一次调用搞定所有清理

第三步:后台清理协程(懒启动)

_QUEUE_IDLE_TIMEOUT = 300  # 5 分钟
_CLEANUP_INTERVAL = 60      # 60 秒检查一次

def _start_cleanup_loop(self) -> None:
    """懒启动后台清理协程"""
    if self._cleanup_task is not None:
        return
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return
    self._cleanup_task = loop.create_task(self._cleanup_loop())

async def _cleanup_loop(self) -> None:
    while True:
        await asyncio.sleep(_CLEANUP_INTERVAL)
        self._cleanup_idle_queues()

def _cleanup_idle_queues(self) -> None:
    now = time.monotonic()
    stale = [q for q in self._all_queues if now - q.last_access > _QUEUE_IDLE_TIMEOUT]
    for q in stale:
        self._remove_queue(q)

懒启动:在 subscribe() 首次被调用时才启动清理协程。避免在 import 时或无订阅者的场景下创建无用的 Task。

第四步:在 emit 中 touch

def emit(self, event: Event) -> None:
    if event.task_id is not None:
        for queue in self._task_subscribers.get(event.task_id, []):
            queue.touch()  # ← 标记活跃
            queue.put_nowait(event)

    for queue in self._subscribers.get(event.event_type, []):
        queue.touch()
        queue.put_nowait(event)

    for queue in self._global_subscribers:
        queue.touch()
        queue.put_nowait(event)

touch() 的时机选择:在 emit 时调用,而非在 queue.get() 时。

为什么?因为 emit 是 EventBus 自己的代码路径,完全可控。而 queue.get() 发生在消费方的协程里,EventBus 无法插手也不应该侵入。emit 时 touch 的语义是:"有人还在往这个 Queue 塞数据"——如果连 emit 都不再光顾这个 Queue 了,说明订阅的事件类型/任务已经没有新事件产生,Queue 的闲置是合理的。

如果需要更严格的活跃判定(消费者是否真的在读取),可以在 events() 异步迭代器的 yield 前也加 touch()

完整数据流

subscribe()                    emit()                   cleanup_loop()
    │                             │                          │
    ├─ _TrackedQueue()            ├─ queue.touch()           ├─ sleep(60s)
    ├─ _all_queues.add(q)         ├─ queue.put_nowait(e)     ├─ scan _all_queues
    ├─ 注册到分组列表              │                          ├─ 找 idle > 300s
    └─ _start_cleanup_loop()      │                          └─ _remove_queue(q)

可观测性:stats 属性

@property
def stats(self) -> dict[str, int]:
    return {
        "global_subscribers": len(self._global_subscribers),
        "type_subscribers": sum(len(v) for v in self._subscribers.values()),
        "task_subscribers": sum(len(v) for v in self._task_subscribers.values()),
        "total_queues": len(self._all_queues),
    }

暴露给 /api/stats 或 Prometheus,可以直接观察到:

模式总结

元素作用对应概念
_TrackedQueue包装原始 Queue,附加追踪元数据装饰器模式(继承式)
_all_queues set全局注册表,O(1) 查找 + 统一遍历索引/注册表
touch()标记"我还在用"心跳/保活
_cleanup_loop后台定期扫描并回收GC 守护线程(协程版)
懒启动首次 subscribe 时才创建 Task延迟初始化
订阅参数保存在 Queue清理时精准定位列表自描述对象

这个模式的通用性

不限于 EventBus。任何"生产者 → Queue → 消费者"模型中,如果消费者可能静默退出(网络断连、进程崩溃、逻辑错误),都可以用这个模式:

关键参数调优

参数建议值调优思路
_QUEUE_IDLE_TIMEOUT300s(5 min)取决于事件频率。高频事件可缩短到 60s,低频可延长到 15min
_CLEANUP_INTERVAL60s扫描开销极小(只比时间戳),可以更频繁如 30s。但没必要低于 10s

对比其他语言的类似方案

语言/框架类似机制
Gocontext.WithTimeout + select,超时自动退出 goroutine
JavaWeakReference + ReferenceQueue,GC 时回调清理
RustArc<Weak> + 定期 upgrade() 检查
KubernetesLease 对象 + 定期续约,超时自动删除

Python 没有 Go 的 context 取消链,没有 Java 的 GC 回调,没有 Rust 的所有权。Python 的方式是显式的:你自己写清理逻辑,自己保证执行。 这也是这个模式存在的意义。

附录:完整实现

参考源码:

"""事件总线 — asyncio.Queue 实现的发布/订阅

支持:
- 发布事件(emit)
- 订阅事件(subscribe)
- 按 event_type 过滤
- 按 task_id 过滤
- 超时队列自动清理(防止内存泄漏)
"""

from __future__ import annotations

import asyncio
import time
import uuid
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, AsyncIterator

# 队列最大闲置时间(秒),超过此时间未被消费则视为泄漏
_QUEUE_IDLE_TIMEOUT = 300  # 5 minutes


@dataclass
class Event:
    """通用事件"""
    event_type: str
    data: dict[str, Any] = field(default_factory=dict)
    source: str | None = None  # 事件来源(如 node_id)
    task_id: uuid.UUID | None = None  # 所属任务


class _TrackedQueue(asyncio.Queue[Event]):
    """带追踪信息的 Queue,记录最后消费时间和订阅参数"""

    __slots__ = ("last_access", "_sub_event_type", "_sub_task_id")

    def __init__(
        self,
        event_type: str | None = None,
        task_id: uuid.UUID | None = None,
    ) -> None:
        super().__init__()
        self.last_access: float = time.monotonic()
        self._sub_event_type = event_type
        self._sub_task_id = task_id

    def touch(self) -> None:
        self.last_access = time.monotonic()


class EventBus:
    """异步事件总线"""

    def __init__(self) -> None:
        self._subscribers: dict[str, list[_TrackedQueue]] = defaultdict(list)
        self._global_subscribers: list[_TrackedQueue] = []
        self._task_subscribers: dict[uuid.UUID, list[_TrackedQueue]] = defaultdict(list)
        self._all_queues: set[_TrackedQueue] = set()
        self._cleanup_task: asyncio.Task | None = None  # type: ignore[type-arg]

    def _start_cleanup_loop(self) -> None:
        """启动后台清理协程(懒启动)"""
        if self._cleanup_task is not None:
            return
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            return
        self._cleanup_task = loop.create_task(self._cleanup_loop())

    async def _cleanup_loop(self) -> None:
        """定期清理超时队列"""
        while True:
            await asyncio.sleep(60)  # 每 60 秒检查一次
            self._cleanup_idle_queues()

    def _cleanup_idle_queues(self) -> None:
        """清理闲置超时的队列"""
        now = time.monotonic()
        stale = [q for q in self._all_queues if now - q.last_access > _QUEUE_IDLE_TIMEOUT]
        for q in stale:
            self._remove_queue(q)

    def _remove_queue(self, q: _TrackedQueue) -> None:
        """从所有订阅列表中移除队列"""
        self._all_queues.discard(q)
        if q._sub_task_id is not None:
            subs = self._task_subscribers.get(q._sub_task_id, [])
            if q in subs:
                subs.remove(q)
                if not subs:
                    del self._task_subscribers[q._sub_task_id]
        elif q._sub_event_type is None:
            if q in self._global_subscribers:
                self._global_subscribers.remove(q)
        else:
            subs = self._subscribers.get(q._sub_event_type, [])
            if q in subs:
                subs.remove(q)

    def emit(self, event: Event) -> None:
        """发布事件到所有匹配的订阅者"""
        # 按 task_id 订阅(最高优先级,精准匹配)
        if event.task_id is not None:
            for queue in self._task_subscribers.get(event.task_id, []):
                queue.touch()
                queue.put_nowait(event)

        # 按类型订阅
        for queue in self._subscribers.get(event.event_type, []):
            queue.touch()
            queue.put_nowait(event)
        # 全局订阅
        for queue in self._global_subscribers:
            queue.touch()
            queue.put_nowait(event)

    def subscribe(
        self,
        event_type: str | None = None,
        task_id: uuid.UUID | None = None,
    ) -> asyncio.Queue[Event]:
        """订阅事件

        Args:
            event_type: 事件类型,None 表示不按类型过滤
            task_id: 任务ID,指定后只接收该任务的事件

        Returns:
            asyncio.Queue,消费方通过 await queue.get() 获取事件
        """
        self._start_cleanup_loop()
        queue = _TrackedQueue(event_type=event_type, task_id=task_id)
        self._all_queues.add(queue)
        if task_id is not None:
            self._task_subscribers[task_id].append(queue)
        elif event_type is None:
            self._global_subscribers.append(queue)
        else:
            self._subscribers[event_type].append(queue)
        return queue

    def unsubscribe(
        self,
        queue: asyncio.Queue[Event],
        event_type: str | None = None,
        task_id: uuid.UUID | None = None,
    ) -> None:
        """取消订阅"""
        if isinstance(queue, _TrackedQueue):
            self._remove_queue(queue)
            return
        # fallback: 对非 TrackedQueue 的老式调用
        if task_id is not None:
            subs = self._task_subscribers.get(task_id, [])
            if queue in subs:
                subs.remove(queue)
                if not subs:
                    del self._task_subscribers[task_id]
        elif event_type is None:
            if queue in self._global_subscribers:
                self._global_subscribers.remove(queue)
        else:
            subs = self._subscribers.get(event_type, [])
            if queue in subs:
                subs.remove(queue)

    async def events(
        self,
        event_type: str | None = None,
        task_id: uuid.UUID | None = None,
    ) -> AsyncIterator[Event]:
        """异步迭代订阅的事件"""
        queue = self.subscribe(event_type, task_id)
        try:
            while True:
                event = await queue.get()
                if isinstance(queue, _TrackedQueue):
                    queue.touch()
                yield event
        finally:
            self.unsubscribe(queue, event_type, task_id)

    def clear(self) -> None:
        """清空所有订阅"""
        self._subscribers.clear()
        self._global_subscribers.clear()
        self._task_subscribers.clear()
        self._all_queues.clear()

    @property
    def stats(self) -> dict[str, int]:
        """返回当前订阅统计"""
        return {
            "global_subscribers": len(self._global_subscribers),
            "type_subscribers": sum(len(v) for v in self._subscribers.values()),
            "task_subscribers": sum(len(v) for v in self._task_subscribers.values()),
            "total_queues": len(self._all_queues),
        }


# 全局事件总线实例
_event_bus: EventBus | None = None


def get_event_bus() -> EventBus:
    """获取全局事件总线"""
    global _event_bus
    if _event_bus is None:
        _event_bus = EventBus()
    return _event_bus

以上就是详解Python中asyncio.Queue长连接服务的内存泄漏问题解决方法的详细内容,更多关于Python asyncio.Queue内存泄漏问题解决的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文