python-redis-lock实现锁自动续期的源码逻辑
作者:重生之我是蔡经理
python-redis-lock简介
python-redis-lock是一个python的第三方库,基于Redis,封装了分布式锁的逻辑,提供了更高级的API来简化锁的获取、保持和释放过程。包括自动续期、锁超时、重入锁等功能。
相比于直接使用redis的setnx,避免了写额外代码来实现锁的复杂逻辑。
锁的续期
在使用分布式锁的情况下,如果某个服务器A的业务还未执行完(可能因为网络拥塞、数据量突然变大等各种原因),但是锁过期了,可能引发额外的问题。例如另一台服务器B发现锁释放了,于是加锁并开始执行业务,从而导致出现问题。
更有可能导致服务器A业务在执行完后去释放锁时,意外地释放了服务器B加的锁,导致服务器C进入,从而引发问题(在未使用唯一性ID表示加锁者的情况下)。
所以对锁的过期时间来续期是很有必要的,它确保了锁在业务执行完后才释放。
python-redis-lock实现自动续期的源码分析
首先在配置python-redis-lock实例时,有两个参数:expire int 和 auto_renewal boolean。
expire:锁过期时间,单位为秒。
auto_renewal:是否开启自动续期锁。
python-redis-lock实例在初始化时,针对这两个参数的源码:
class Lock(object): def __init__(self, redis_client, name, expire=None, auto_renewal=False, ...): ... if expire: expire = int(expire) ... self._expire = expire self._lock_renewal_interval = float(expire) * 2/3 if auto_renewal else None ...
假设我们设置了一个过期时间30秒,开启自动续期的Lock实例,则self._expire=30秒,self._lock_renewal_interval=20秒。
当在代码层调用Lock.acquire方法时,判断self._lock_renewal_interval是否为空,如果不为空,则开启锁的自动续期(这里省略了一些内容,因为只关注续期的代码实现逻辑):
class Lock(object): def acquire(self, blocking=True, timeout=None): ... if self._lock_renewal_interval is not None: self._start_lock_renewer()
在self._start_lock_renewer方法中,基于python的threading模块,开启了一个锁自动更新线程self._lock_renewal_thread。这个线程执行self._lock_renewer方法
def _start_lock_renewer(self): ... self._lock_renewal_stop = threading.Event() # threading.Event()用于线程间的协调 self._lock_renewal_thread = threading.Thread( group=None, target=self._lock_renewer, kwargs={ 'name': self._name, 'lockref': weakref.ref(self), # 对Lock实例的弱引用 'interval': self._lock_renewal_interval, 'stop': self._lock_renewal_stop, }, ) self._lock_renewal_thread.demon = True self._lock_renewal_thread.start()
weakref.ref(self)创建当前类实例(self)的弱引用,并将其存储在变量 lockref 中。使用 weakref.ref 创建的弱引用不会阻止对象被垃圾回收。
在self._lock_renewer方法中,首先用了while循环来每次等待20秒(interval=20)再执行循环体:如果弱引用对象(即Lock实例本身)没有被垃圾回来,则执行续期的方法(lock.extend):
def _lock_renewer(name, lockref, interval, stop): while not stop.wait(timeout=interval): ... lock: "Lock" = lockref() if lock is None: break lock.extend(expire=lock._expire)
在self.extend方法中,执行了一个Lua脚本来更新锁的过期时间。
在满足self._name和self._signal的缓存键值存在且锁未过期的情况下,将当前锁的过期时间重置为expire(30秒)。
def extend(self. expire=None): if expire: expire = int(expire) error = self.extend_script( client=self._client, # 对redis的连接 keys=(self._name, self._signal), args=(self._id, expire)) ... # self.extend_script的执行逻辑: EXTEND_SCRIPT = b""" if redis.call("get", KEYS[1]) ~= ARGV[1] then return 1 elseif redis.call("ttl", KEYS[1]) < 0 then return 2 else redis.call("expire", KEYS[1], ARGV[2]) return 0 end """ cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
以上就是python-redis-lock实现锁自动续期的源码逻辑。
其中用到了多线程threading、弱引用weakref和Lua脚本等相关知识。
到此这篇关于python-redis-lock是如何实现锁自动续期的的文章就介绍到这了,更多相关python-redis-lock锁自动续期内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!