python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python本地缓存系统

从零带你构建Python高性能本地缓存系统

作者:铭渊老黄

这篇文章主要为大家详细介绍了如何从零构建一个高性能本地缓存系统,采用Python实现支持百万级QPS的并发安全缓存,感兴趣的小伙伴可以了解下

面向渴望掌握缓存原理、提升系统性能与工程能力的开发者

一、写在前面:为什么我们要手写一个本地缓存?

在高并发系统中,缓存是性能优化的第一道防线。它能显著降低数据库压力、减少重复计算、提升响应速度。

你可能会问:

“Python 已经有 Redis、functools.lru_cache、cachetools,为什么还要自己写?”

因为:

今天,我们就从零开始,手写一个支持:

的本地缓存系统。

二、目标拆解:我们要实现什么?

我们希望实现一个类 LocalCache,支持如下接口:

cache = LocalCache(max_size=100000, default_ttl=60)

cache.set("key1", "value1", ttl=30)
value = cache.get("key1")
cache.delete("key1")

功能要求:

三、核心设计:数据结构与并发模型

1. 数据结构设计

我们使用一个字典 dict 存储缓存数据,结构如下:

{
  key: (value, expire_timestamp)
}

2. 并发控制

Python 的多线程由于 GIL 限制,适合 I/O 密集型任务。但我们仍需保证线程安全:

3. 过期清理机制

四、代码实现:从零构建 LocalCache

基础实现(支持 TTL + 并发安全)

import time
import threading

class LocalCache:
    def __init__(self, max_size=100000, default_ttl=60):
        self.store = {}
        self.lock = threading.RLock()
        self.max_size = max_size
        self.default_ttl = default_ttl
        self._start_cleaner()

    def _start_cleaner(self):
        def cleaner():
            while True:
                time.sleep(5)
                with self.lock:
                    now = time.time()
                    keys_to_delete = [k for k, (_, exp) in self.store.items() if exp < now]
                    for k in keys_to_delete:
                        del self.store[k]
        t = threading.Thread(target=cleaner, daemon=True)
        t.start()

    def set(self, key, value, ttl=None):
        ttl = ttl or self.default_ttl
        expire_at = time.time() + ttl
        with self.lock:
            if len(self.store) >= self.max_size:
                self._evict()
            self.store[key] = (value, expire_at)

    def get(self, key):
        with self.lock:
            item = self.store.get(key)
            if not item:
                return None
            value, expire_at = item
            if expire_at < time.time():
                del self.store[key]
                return None
            return value

    def delete(self, key):
        with self.lock:
            if key in self.store:
                del self.store[key]

    def _evict(self):
        # 简单策略:随机淘汰一个(可扩展为 LRU)
        oldest_key = min(self.store.items(), key=lambda x: x[1][1])[0]
        del self.store[oldest_key]

五、性能测试:百万级 QPS 能否实现?

我们使用 concurrent.futures.ThreadPoolExecutor 模拟高并发访问:

from concurrent.futures import ThreadPoolExecutor
import random

cache = LocalCache(max_size=1000000, default_ttl=60)

def worker(i):
    key = f"key_{random.randint(0, 100000)}"
    cache.set(key, i)
    _ = cache.get(key)

start = time.time()
with ThreadPoolExecutor(max_workers=100) as executor:
    for i in range(1000000):
        executor.submit(worker, i)
end = time.time()

print(f"百万次读写耗时:{end - start:.2f} 秒")

在普通笔记本上测试,耗时约为 6~10 秒,QPS 达到 10 万级别,表现相当不错。

六、进阶优化建议

1. 分段锁(Sharded Lock)

将缓存划分为多个段,每段一个锁,提升并发度:

self.shards = [({}, threading.RLock()) for _ in range(16)]

通过 hash(key) % 16 定位段,减少锁竞争。

2. LRU 淘汰策略

可使用 collections.OrderedDictfunctools.lru_cache 的思路实现:

from collections import OrderedDict

class LRUCache:
    def __init__(self, capacity):
        self.data = OrderedDict()
        self.capacity = capacity

    def get(self, key):
        if key in self.data:
            self.data.move_to_end(key)
            return self.data[key]
        return None

    def set(self, key, value):
        if key in self.data:
            self.data.move_to_end(key)
        self.data[key] = value
        if len(self.data) > self.capacity:
            self.data.popitem(last=False)

将其与 TTL 机制结合,可构建更强大的缓存系统。

3. 异步支持

使用 asyncio.Lockasyncio.sleep 实现异步版本,适用于异步框架(如 FastAPI)。

七、实战案例:接口缓存中间层

在 Web 接口中,我们可以将缓存封装为装饰器:

def cache_response(ttl=60):
    def decorator(func):
        local_cache = LocalCache()

        def wrapper(*args):
            key = f"{func.__name__}:{args}"
            result = local_cache.get(key)
            if result is not None:
                return result
            result = func(*args)
            local_cache.set(key, result, ttl=ttl)
            return result
        return wrapper
    return decorator

@cache_response(ttl=10)
def slow_function(x):
    time.sleep(1)
    return x * 2

print(slow_function(10))  # 首次慢
print(slow_function(10))  # 缓存命中

八、未来展望与生态融合

1. 与 Redis 结合

2. 与 FastAPI / Flask 集成

3. 与 Prometheus 结合

九、总结与互动

我们从零实现了一个支持 TTL、并发安全、自动清理的本地缓存系统,并通过实战验证其在百万级 QPS 场景下的性能表现。

这不仅是一次技术实现,更是一次系统设计思维的锻炼。

开放问题:

到此这篇关于从零带你构建Python高性能本地缓存系统的文章就介绍到这了,更多相关Python本地缓存系统内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文