python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python内存泄漏

一文带你系统掌握Python中内存泄漏的诊断与解决方案

作者:铭渊老黄

本文深入探讨了 Python 内存泄漏问题及其解决方案,重点介绍了 tracemalloc 和 objgraph 两大工具的使用方法,下面就跟随小编一起学习一下吧

引言:当程序变成"内存黑洞"

凌晨三点,我被运维的电话吵醒:"你们的数据处理服务又崩了!内存占用从 2GB 飙到 32GB,服务器直接 OOM 重启!"这已经是本月第三次了。

那是我职业生涯中最难熬的一周。白天正常运行的服务,到了晚上就像失控的野兽,疯狂吞噬内存。我尝试了所有能想到的方法:检查日志、审查代码、增加内存限制……问题依旧。直到我掌握了 tracemallocobjgraph 这两大利器,才终于揪出了隐藏在缓存层中的内存泄漏元凶。

今天,我将通过真实案例,带你系统掌握 Python 内存泄漏的诊断与解决方案。无论你是刚遇到内存问题的新手,还是想深化调优技能的资深开发者,这篇文章都将成为你的实战手册。

一、内存泄漏基础:理解问题本质

1.1 什么是内存泄漏

在 Python 中,内存泄漏指的是:程序持续分配内存但无法释放已不再使用的对象,导致可用内存逐渐减少

# 经典内存泄漏示例
class DataCache:
    def __init__(self):
        self._cache = {}  # 永远不清理的缓存
    
    def add_data(self, key, value):
        self._cache[key] = value  # 数据只增不减
    
    def process_request(self, request_id, data):
        # 每个请求都缓存数据,从不删除
        self.add_data(request_id, data)
        return f"Processed {request_id}"

# 使用示例
cache = DataCache()
for i in range(1000000):
    # 一百万次请求后,内存爆炸!
    cache.process_request(f"req_{i}", "x" * 1000)

1.2 Python 的内存管理机制

Python 使用**引用计数 + 垃圾回收(GC)**机制管理内存:

import sys

# 引用计数示例
obj = [1, 2, 3]
print(f"初始引用计数: {sys.getrefcount(obj) - 1}")  # -1 因为 getrefcount 自己也引用了

ref1 = obj
print(f"增加引用后: {sys.getrefcount(obj) - 1}")

del ref1
print(f"删除引用后: {sys.getrefcount(obj) - 1}")

# 循环引用问题
class Node:
    def __init__(self, value):
        self.value = value
        self.next = None

# 创建循环引用
node1 = Node(1)
node2 = Node(2)
node1.next = node2
node2.next = node1  # 循环!

# 即使删除引用,循环内的对象也不会立即释放
del node1, node2
# GC 会在后台处理,但可能有延迟

1.3 常见内存泄漏场景

# 场景一:全局容器无限增长
global_logs = []

def log_event(event):
    global_logs.append(event)  # 永不清理

# 场景二:闭包捕获大对象
def create_handler(large_data):
    def handler():
        # 闭包持有 large_data 引用
        return len(large_data)
    return handler

# 场景三:未正确关闭资源
class FileProcessor:
    def __init__(self, filename):
        self.file = open(filename)  # 没有 __del__ 或 __exit__
    
    def process(self):
        return self.file.read()

# 场景四:缓存未设置过期策略
cache = {}
def get_or_compute(key):
    if key not in cache:
        cache[key] = expensive_computation(key)
    return cache[key]

def expensive_computation(key):
    return [0] * 1000000  # 模拟大对象

二、tracemalloc:Python 内置的内存追踪利器

2.1 基础使用与快照对比

import tracemalloc
import linecache

def display_top_memory(snapshot, key_type='lineno', limit=10):
    """显示内存占用 Top N"""
    snapshot = snapshot.filter_traces((
        tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
        tracemalloc.Filter(False, "<unknown>"),
    ))
    
    top_stats = snapshot.statistics(key_type)
    
    print(f"\n{'='*70}")
    print(f"Top {limit} 内存占用(按 {key_type} 排序)")
    print(f"{'='*70}")
    
    for index, stat in enumerate(top_stats[:limit], 1):
        frame = stat.traceback[0]
        filename = frame.filename
        lineno = frame.lineno
        
        # 获取源代码
        line = linecache.getline(filename, lineno).strip()
        
        print(f"\n#{index}: {filename}:{lineno}")
        print(f"    {line}")
        print(f"    大小: {stat.size / 1024 / 1024:.1f} MB")
        print(f"    数量: {stat.count} 个对象")

# 实战案例:检测内存泄漏
def memory_leak_example():
    """模拟内存泄漏"""
    tracemalloc.start()
    
    # 快照 1:初始状态
    snapshot1 = tracemalloc.take_snapshot()
    
    # 执行可能泄漏的代码
    leaked_objects = []
    for i in range(10000):
        # 故意泄漏:创建对象但不释放
        leaked_objects.append([0] * 1000)
    
    # 快照 2:执行后状态
    snapshot2 = tracemalloc.take_snapshot()
    
    # 对比快照
    print("\n初始状态内存占用:")
    display_top_memory(snapshot1, limit=5)
    
    print("\n执行后内存占用:")
    display_top_memory(snapshot2, limit=5)
    
    # 分析增量
    top_stats = snapshot2.compare_to(snapshot1, 'lineno')
    
    print(f"\n{'='*70}")
    print("内存增量分析(Top 10)")
    print(f"{'='*70}")
    
    for stat in top_stats[:10]:
        print(f"\n{stat}")
        if stat.count_diff > 0:
            print(f"  ⚠️  新增对象: {stat.count_diff} 个")
            print(f"  ⚠️  内存增加: {stat.size_diff / 1024 / 1024:.2f} MB")
    
    tracemalloc.stop()

# 运行测试
memory_leak_example()

2.2 实战案例:Web 应用内存泄漏诊断

import tracemalloc
from flask import Flask, request
import time

app = Flask(__name__)

# 全局缓存(潜在泄漏点)
request_cache = {}

class MemoryMonitor:
    """内存监控装饰器"""
    def __init__(self):
        self.snapshots = []
        tracemalloc.start()
    
    def capture_snapshot(self, label):
        """捕获内存快照"""
        snapshot = tracemalloc.take_snapshot()
        self.snapshots.append((label, snapshot, time.time()))
        
    def analyze_leak(self, threshold_mb=10):
        """分析内存泄漏"""
        if len(self.snapshots) < 2:
            print("需要至少两个快照进行对比")
            return
        
        for i in range(1, len(self.snapshots)):
            label1, snapshot1, time1 = self.snapshots[i-1]
            label2, snapshot2, time2 = self.snapshots[i]
            
            # 计算内存增量
            top_stats = snapshot2.compare_to(snapshot1, 'lineno')
            
            total_increase = sum(stat.size_diff for stat in top_stats if stat.size_diff > 0)
            increase_mb = total_increase / 1024 / 1024
            
            print(f"\n{'='*70}")
            print(f"对比: {label1} -> {label2}")
            print(f"时间差: {time2 - time1:.2f}秒")
            print(f"内存增加: {increase_mb:.2f} MB")
            print(f"{'='*70}")
            
            if increase_mb > threshold_mb:
                print("⚠️  检测到可能的内存泄漏!")
                print("\n内存增长最多的代码位置:")
                
                for stat in top_stats[:5]:
                    if stat.size_diff > 0:
                        print(f"\n{stat.traceback.format()[0]}")
                        print(f"  增加: {stat.size_diff / 1024 / 1024:.2f} MB")
                        print(f"  新对象: {stat.count_diff} 个")

# 创建监控器
monitor = MemoryMonitor()

@app.before_request
def before_request():
    """请求前捕获快照"""
    request.start_time = time.time()

@app.after_request
def after_request(response):
    """请求后分析内存"""
    if hasattr(request, 'start_time'):
        elapsed = time.time() - request.start_time
        if elapsed > 0.1:  # 慢请求
            monitor.capture_snapshot(f"After {request.path}")
    return response

@app.route('/api/process')
def process_data():
    """模拟处理请求(有内存泄漏)"""
    request_id = request.args.get('id', 'unknown')
    
    # 泄漏点:缓存永不清理
    large_data = [0] * 100000
    request_cache[request_id] = large_data
    
    return {'status': 'ok', 'cached_requests': len(request_cache)}

@app.route('/api/analyze')
def analyze_memory():
    """触发内存分析"""
    monitor.analyze_leak(threshold_mb=5)
    return {'status': 'analysis_complete'}

# 运行测试
if __name__ == '__main__':
    # 模拟请求
    with app.test_client() as client:
        monitor.capture_snapshot("Initial")
        
        # 发送 100 个请求
        for i in range(100):
            client.get(f'/api/process?id={i}')
        
        monitor.capture_snapshot("After 100 requests")
        
        # 再发送 100 个请求
        for i in range(100, 200):
            client.get(f'/api/process?id={i}')
        
        monitor.capture_snapshot("After 200 requests")
        
        # 分析结果
        client.get('/api/analyze')

2.3 高级技巧:追踪特定对象

import tracemalloc
import gc

class ObjectTracker:
    """追踪特定类型对象的内存分配"""
    
    @staticmethod
    def track_allocations(target_type, duration_seconds=10):
        """追踪指定时间内的对象分配"""
        tracemalloc.start()
        initial_snapshot = tracemalloc.take_snapshot()
        
        print(f"开始追踪 {target_type.__name__} 对象,持续 {duration_seconds} 秒...")
        time.sleep(duration_seconds)
        
        final_snapshot = tracemalloc.take_snapshot()
        tracemalloc.stop()
        
        # 分析增量
        top_stats = final_snapshot.compare_to(initial_snapshot, 'lineno')
        
        print(f"\n{target_type.__name__} 对象内存分配分析:")
        for stat in top_stats[:10]:
            if target_type.__name__ in str(stat):
                print(f"\n{stat}")
    
    @staticmethod
    def find_object_sources(obj):
        """查找对象的引用来源"""
        print(f"\n{'='*70}")
        print(f"分析对象: {type(obj).__name__} at {hex(id(obj))}")
        print(f"{'='*70}")
        
        # 获取所有引用该对象的对象
        referrers = gc.get_referrers(obj)
        
        print(f"\n找到 {len(referrers)} 个引用者:")
        for i, ref in enumerate(referrers[:10], 1):
            ref_type = type(ref).__name__
            print(f"\n#{i} 引用者类型: {ref_type}")
            
            if isinstance(ref, dict):
                # 如果是字典,尝试找到键
                for key, value in ref.items():
                    if value is obj:
                        print(f"  字典键: {key}")
                        break
            elif isinstance(ref, (list, tuple)):
                print(f"  容器长度: {len(ref)}")
            
            # 显示引用者的引用者(递归查找)
            second_level = gc.get_referrers(ref)
            if second_level:
                print(f"  被 {len(second_level)} 个对象引用")

# 实战示例
class LeakyCache:
    def __init__(self):
        self.data = {}
    
    def add(self, key, value):
        self.data[key] = value

# 测试
cache = LeakyCache()
for i in range(1000):
    cache.add(f"key_{i}", [0] * 10000)

# 追踪泄漏源
ObjectTracker.find_object_sources(cache.data)

三、objgraph:可视化对象关系图谱

3.1 安装与基础使用

# 安装
pip install objgraph

# 生成图谱需要 Graphviz
# Ubuntu/Debian
sudo apt-get install graphviz

# macOS
brew install graphviz

# Windows
# 从 https://graphviz.org/download/ 下载安装
import objgraph
import gc

# 基础统计
def analyze_object_types():
    """分析当前内存中的对象类型"""
    print("\n内存中最多的对象类型(Top 20):")
    objgraph.show_most_common_types(limit=20)

# 增长分析
def track_object_growth():
    """追踪对象数量增长"""
    # 第一次统计
    gc.collect()
    objgraph.show_growth(limit=10)
    
    # 创建一些对象
    leaked_list = []
    for i in range(10000):
        leaked_list.append({'data': [0] * 100})
    
    # 第二次统计
    print("\n执行操作后的对象增长:")
    objgraph.show_growth(limit=10)

# 运行分析
analyze_object_types()
track_object_growth()

3.2 实战案例:追踪循环引用

import objgraph
import os

class Node:
    """链表节点(可能产生循环引用)"""
    def __init__(self, value):
        self.value = value
        self.next = None
        self.prev = None

class CircularList:
    """循环链表(演示内存泄漏)"""
    def __init__(self):
        self.head = None
        self.size = 0
    
    def add(self, value):
        new_node = Node(value)
        if not self.head:
            self.head = new_node
            new_node.next = new_node
            new_node.prev = new_node
        else:
            tail = self.head.prev
            tail.next = new_node
            new_node.prev = tail
            new_node.next = self.head
            self.head.prev = new_node
        
        self.size += 1

# 创建循环引用
def create_circular_references():
    """创建包含循环引用的对象"""
    lists = []
    for i in range(10):
        circular_list = CircularList()
        for j in range(100):
            circular_list.add(f"data_{i}_{j}")
        lists.append(circular_list)
    
    return lists

# 可视化分析
def visualize_references():
    """生成对象引用关系图"""
    # 创建对象
    leaked_lists = create_circular_references()
    
    # 分析第一个列表
    target = leaked_lists[0]
    
    print("\n生成对象引用关系图...")
    
    # 生成反向引用链(是什么在引用这个对象)
    output_file = '/home/claude/backrefs.png'
    objgraph.show_backrefs(
        [target],
        max_depth=3,
        filename=output_file,
        refcounts=True
    )
    print(f"反向引用图已保存: {output_file}")
    
    # 生成前向引用链(这个对象引用了什么)
    output_file = '/home/claude/refs.png'
    objgraph.show_refs(
        [target.head],
        max_depth=3,
        filename=output_file,
        refcounts=True
    )
    print(f"前向引用图已保存: {output_file}")
    
    return leaked_lists

# 运行可视化
leaked = visualize_references()

# 查看引用链
print("\n详细引用链分析:")
objgraph.show_chain(
    objgraph.find_backref_chain(
        leaked[0],
        objgraph.is_proper_module
    ),
    filename='/home/claude/chain.png'
)

3.3 综合案例:Django 应用内存泄漏诊断

import objgraph
import tracemalloc
import gc
from functools import wraps

class MemoryLeakDetector:
    """内存泄漏检测器(生产环境友好)"""
    
    def __init__(self, threshold_mb=50):
        self.threshold_mb = threshold_mb
        self.baseline = None
        self.snapshots = []
        
    def start_monitoring(self):
        """开始监控"""
        gc.collect()
        tracemalloc.start()
        self.baseline = tracemalloc.take_snapshot()
        print("✅ 内存监控已启动")
    
    def check_memory(self, label="checkpoint"):
        """检查内存状态"""
        if not self.baseline:
            print("⚠️  请先调用 start_monitoring()")
            return
        
        gc.collect()
        current = tracemalloc.take_snapshot()
        self.snapshots.append((label, current))
        
        # 计算增量
        stats = current.compare_to(self.baseline, 'lineno')
        total_increase = sum(s.size_diff for s in stats if s.size_diff > 0)
        increase_mb = total_increase / 1024 / 1024
        
        print(f"\n{'='*70}")
        print(f"检查点: {label}")
        print(f"内存增长: {increase_mb:.2f} MB")
        
        if increase_mb > self.threshold_mb:
            print("🚨 检测到内存泄漏!")
            self._analyze_leak(stats)
        else:
            print("✅ 内存使用正常")
        print(f"{'='*70}")
    
    def _analyze_leak(self, stats):
        """详细分析泄漏"""
        print("\n内存增长最多的位置(Top 10):")
        for i, stat in enumerate(stats[:10], 1):
            if stat.size_diff > 0:
                print(f"\n#{i}: {stat.traceback.format()[0]}")
                print(f"  增长: {stat.size_diff / 1024 / 1024:.2f} MB")
                print(f"  对象: +{stat.count_diff}")
        
        # 使用 objgraph 分析对象类型
        print("\n对象类型增长分析:")
        objgraph.show_growth(limit=10)
    
    def generate_report(self, output_dir='/home/claude'):
        """生成完整报告"""
        print(f"\n生成内存泄漏报告...")
        
        # 1. 对象类型统计
        print("\n1. 当前内存对象类型分布:")
        objgraph.show_most_common_types(limit=15)
        
        # 2. 查找潜在泄漏对象
        print("\n2. 查找可疑对象...")
        suspicious_types = ['dict', 'list', 'tuple', 'set']
        
        for obj_type in suspicious_types:
            objects = objgraph.by_type(obj_type)
            if len(objects) > 10000:
                print(f"\n⚠️  {obj_type} 对象数量异常: {len(objects)}")
                
                # 随机采样分析
                sample = objects[0] if objects else None
                if sample:
                    output_file = os.path.join(output_dir, f'{obj_type}_refs.png')
                    objgraph.show_refs(
                        [sample],
                        filename=output_file,
                        max_depth=2
                    )
                    print(f"  引用图已保存: {output_file}")
        
        # 3. tracemalloc 详细报告
        if self.snapshots:
            latest_label, latest_snapshot = self.snapshots[-1]
            print(f"\n3. 最新快照分析 ({latest_label}):")
            
            top_stats = latest_snapshot.statistics('lineno')
            print("\n内存占用 Top 10:")
            for i, stat in enumerate(top_stats[:10], 1):
                frame = stat.traceback[0]
                print(f"\n#{i}: {frame.filename}:{frame.lineno}")
                print(f"  大小: {stat.size / 1024 / 1024:.2f} MB")
                print(f"  对象数: {stat.count}")

# 装饰器:自动检测函数内存泄漏
def detect_leak(detector):
    """装饰器:自动检测函数执行后的内存变化"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            gc.collect()
            before = tracemalloc.take_snapshot()
            
            result = func(*args, **kwargs)
            
            gc.collect()
            after = tracemalloc.take_snapshot()
            
            stats = after.compare_to(before, 'lineno')
            total_increase = sum(s.size_diff for s in stats if s.size_diff > 0)
            increase_mb = total_increase / 1024 / 1024
            
            if increase_mb > 1:  # 阈值 1MB
                print(f"\n⚠️  {func.__name__} 可能存在内存泄漏")
                print(f"   内存增长: {increase_mb:.2f} MB")
                
                for stat in stats[:3]:
                    if stat.size_diff > 0:
                        print(f"   {stat}")
            
            return result
        return wrapper
    return decorator

# 使用示例
detector = MemoryLeakDetector(threshold_mb=10)
detector.start_monitoring()

@detect_leak(detector)
def process_large_dataset():
    """模拟数据处理(有泄漏)"""
    cache = {}
    for i in range(50000):
        cache[f"key_{i}"] = [0] * 1000  # 泄漏点
    return len(cache)

# 测试
result = process_large_dataset()
detector.check_memory("After processing")
detector.generate_report()

四、实战调试流程与最佳实践

4.1 标准诊断流程

import tracemalloc
import objgraph
import gc
import psutil
import os

class MemoryDebugger:
    """内存调试完整工作流"""
    
    @staticmethod
    def step1_confirm_leak():
        """步骤1:确认是否真的有内存泄漏"""
        print("="*70)
        print("步骤 1: 确认内存泄漏")
        print("="*70)
        
        process = psutil.Process(os.getpid())
        
        baseline = process.memory_info().rss / 1024 / 1024
        print(f"基线内存: {baseline:.2f} MB")
        
        # 模拟工作负载
        for iteration in range(5):
            # 执行业务逻辑
            _ = [0] * 1000000
            gc.collect()
            
            current = process.memory_info().rss / 1024 / 1024
            increase = current - baseline
            
            print(f"迭代 {iteration + 1}: {current:.2f} MB (+{increase:.2f} MB)")
            
            if increase > 100:
                print("⚠️  确认内存持续增长,可能存在泄漏!")
                return True
        
        print("✅ 内存使用正常")
        return False
    
    @staticmethod
    def step2_locate_source():
        """步骤2:使用 tracemalloc 定位泄漏源"""
        print("\n" + "="*70)
        print("步骤 2: 定位泄漏源")
        print("="*70)
        
        tracemalloc.start()
        snapshot1 = tracemalloc.take_snapshot()
        
        # 执行可疑代码
        leaked_data = []
        for i in range(10000):
            leaked_data.append([0] * 1000)
        
        snapshot2 = tracemalloc.take_snapshot()
        
        top_stats = snapshot2.compare_to(snapshot1, 'lineno')
        
        print("\n内存增长最多的代码位置:")
        for stat in top_stats[:5]:
            if stat.size_diff > 0:
                print(f"\n{stat.traceback.format()[0]}")
                print(f"增长: {stat.size_diff / 1024 / 1024:.2f} MB")
        
        tracemalloc.stop()
    
    @staticmethod
    def step3_analyze_objects():
        """步骤3:使用 objgraph 分析对象关系"""
        print("\n" + "="*70)
        print("步骤 3: 分析对象关系")
        print("="*70)
        
        # 查看对象增长
        gc.collect()
        print("\n初始对象统计:")
        objgraph.show_growth(limit=10)
        
        # 创建泄漏
        global leaked_cache
        leaked_cache = {}
        for i in range(5000):
            leaked_cache[i] = [0] * 1000
        
        print("\n操作后对象增长:")
        objgraph.show_growth(limit=10)
        
        # 生成引用图
        if leaked_cache:
            sample_obj = list(leaked_cache.values())[0]
            objgraph.show_backrefs(
                [sample_obj],
                filename='/home/claude/leak_backrefs.png',
                max_depth=3
            )
            print("\n引用图已生成: /home/claude/leak_backrefs.png")
    
    @staticmethod
    def step4_verify_fix():
        """步骤4:验证修复效果"""
        print("\n" + "="*70)
        print("步骤 4: 验证修复")
        print("="*70)
        
        tracemalloc.start()
        before = tracemalloc.take_snapshot()
        
        # 修复后的代码(使用弱引用或限制缓存大小)
        from collections import OrderedDict
        
        class LRUCache:
            def __init__(self, max_size=1000):
                self.cache = OrderedDict()
                self.max_size = max_size
            
            def set(self, key, value):
                if key in self.cache:
                    self.cache.move_to_end(key)
                self.cache[key] = value
                if len(self.cache) > self.max_size:
                    self.cache.popitem(last=False)
        
        cache = LRUCache(max_size=1000)
        for i in range(10000):
            cache.set(i, [0] * 1000)
        
        after = tracemalloc.take_snapshot()
        stats = after.compare_to(before, 'lineno')
        
        total_increase = sum(s.size_diff for s in stats if s.size_diff > 0)
        print(f"\n修复后内存增长: {total_increase / 1024 / 1024:.2f} MB")
        
        if total_increase / 1024 / 1024 < 10:
            print("✅ 修复有效,内存控制在合理范围")
        else:
            print("⚠️  仍需进一步优化")
        
        tracemalloc.stop()

# 执行完整诊断流程
if __name__ == '__main__':
    debugger = MemoryDebugger()
    
    if debugger.step1_confirm_leak():
        debugger.step2_locate_source()
        debugger.step3_analyze_objects()
        debugger.step4_verify_fix()

4.2 生产环境监控方案

import tracemalloc
import threading
import time
from datetime import datetime

class ProductionMemoryMonitor:
    """生产环境内存监控(低开销)"""
    
    def __init__(self, check_interval=300, alert_threshold_mb=500):
        self.check_interval = check_interval
        self.alert_threshold_mb = alert_threshold_mb
        self.running = False
        self.thread = None
        
    def start(self):
        """启动监控线程"""
        if self.running:
            return
        
        self.running = True
        tracemalloc.start()
        
        self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self.thread.start()
        print(f"✅ 内存监控已启动(每 {self.check_interval} 秒检查一次)")
    
    def stop(self):
        """停止监控"""
        self.running = False
        if self.thread:
            self.thread.join()
        tracemalloc.stop()
        print("⏹  内存监控已停止")
    
    def _monitor_loop(self):
        """监控循环"""
        baseline = None
        
        while self.running:
            try:
                snapshot = tracemalloc.take_snapshot()
                
                if baseline is None:
                    baseline = snapshot
                else:
                    self._check_memory(baseline, snapshot)
                
                time.sleep(self.check_interval)
                
            except Exception as e:
                print(f"监控出错: {e}")
    
    def _check_memory(self, baseline, current):
        """检查内存状态"""
        stats = current.compare_to(baseline, 'lineno')
        total_increase = sum(s.size_diff for s in stats if s.size_diff > 0)
        increase_mb = total_increase / 1024 / 1024
        
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        if increase_mb > self.alert_threshold_mb:
            print(f"\n🚨 [{timestamp}] 内存告警!")
            print(f"   增长: {increase_mb:.2f} MB")
            print(f"   Top 3 增长位置:")
            
            for i, stat in enumerate(stats[:3], 1):
                if stat.size_diff > 0:
                    print(f"   #{i}: {stat.traceback.format()[0]}")
                    print(f"       +{stat.size_diff / 1024 / 1024:.2f} MB")
            
            # 可以在这里发送告警邮件或消息
        else:
            print(f"✅ [{timestamp}] 内存正常 (+{increase_mb:.2f} MB)")

# 使用示例
monitor = ProductionMemoryMonitor(check_interval=10, alert_threshold_mb=50)
monitor.start()

# 模拟应用运行
try:
    leaked = []
    for i in range(100):
        leaked.append([0] * 100000)
        time.sleep(1)
except KeyboardInterrupt:
    pass
finally:
    monitor.stop()

五、总结与最佳实践

5.1 工具选择决策树

发现内存持续增长
    ↓
使用 psutil 确认物理内存增长
    ↓
tracemalloc 定位代码位置
    ├─ 找到明确位置 → 修复代码
    └─ 位置不明确
        ↓
    objgraph 分析对象关系
        ├─ 发现循环引用 → 使用弱引用或手动打破
        ├─ 发现缓存无限增长 → 添加 LRU 或 TTL
        └─ 发现资源未关闭 → 使用上下文管理器

5.2 防御性编程建议

# 1. 使用上下文管理器
with open('file.txt') as f:
    data = f.read()

# 2. 限制缓存大小
from functools import lru_cache

@lru_cache(maxsize=1000)
def expensive_function(arg):
    return arg ** 2

# 3. 使用弱引用
import weakref

class Cache:
    def __init__(self):
        self._cache = weakref.WeakValueDictionary()

# 4. 定期清理
def cleanup_old_data(cache, max_age_seconds=3600):
    now = time.time()
    to_delete = [
        k for k, v in cache.items()
        if now - v['timestamp'] > max_age_seconds
    ]
    for k in to_delete:
        del cache[k]

# 5. 使用生成器处理大数据
def process_large_file(filename):
    with open(filename) as f:
        for line in f:  # 逐行处理,不加载整个文件
            yield process_line(line)

到此这篇关于一文带你系统掌握Python中内存泄漏的诊断与解决方案的文章就介绍到这了,更多相关Python内存泄漏内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文