Python+Redis从零打造分布式锁实战示例
作者:傻子的尴尬 IT智慧谷
引言
在分布式系统中,多个节点可能需要访问同一共享资源,这就需要一种协调机制来保证在同一时刻只有一个节点进行操作,这就是分布式锁的作用。
1. 简单实现(基于SETNX命令)
使用setnx命令(set if not exists)尝试设置一个key-value对,如果key不存在,则设置成功并返回1,表示获取到了锁。
import redis import time def acquire_lock(redis_client, lock_key): identifier = str(time.time()) + str(id(threading.current_thread())) acquired = redis_client.setnx(lock_key, identifier) return acquired def release_lock(redis_client, lock_key, identifier): pipe = redis_client.pipeline(True) while True: try: pipe.watch(lock_key) if pipe.get(lock_key) == identifier: pipe.multi() pipe.delete(lock_key) pipe.execute() return True pipe.unwatch() break except redis.exceptions.WatchError: pass return False # 使用示例 r = redis.Redis(host='localhost', port=6379, db=0) lock_key = 'mylock' if acquire_lock(r, lock_key): print("Lock acquired") # 执行临界区代码... release_lock(r, lock_key, identifier)
2. 改进:设置超时时间(expire命令)
为防止进程崩溃导致锁无法释放,可以给锁设置一个过期时间。
def acquire_lock_with_timeout(redis_client, lock_key, expire_time): identifier = str(time.time()) + str(id(threading.current_thread())) while True: result = redis_client.set(lock_key, identifier, nx=True, ex=expire_time) if result: return identifier time.sleep(0.1) # 使用示例不变,但在acquire_lock_with_timeout函数中增加了expire_time参数
3. 进一步改进:使用Lua脚本保证原子性
在释放锁的时候,需要确保删除的是自己设置的锁,且这个操作是原子的。可以使用Lua脚本来实现。
RELEASE_LOCK_SCRIPT = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ def release_lock LUA(redis_client, lock_key, identifier): release_script = redis_client.register_script(RELEASE_LOCK_SCRIPT) released = release_script(keys=[lock_key], args=[identifier]) return bool(released) # 使用示例不变
4. 更完善的实现,Redlock算法
Redlock算法的实现相对复杂,以下是一个简化的Python示例,展示了如何在多个Redis实例上尝试获取和释放锁。请注意,在实际应用中,需要确保Redis实例之间的时间同步,并且考虑网络延迟等因素。
import redis import time class RedLock: def __init__(self, masters, ttl): self.masters = [redis.Redis(host=host, port=port, db=db) for host, port, db in masters] self.quorum = len(self.masters) // 2 + 1 self.ttl = ttl def lock(self, resource_id): expiration = time.time() + self.ttl while time.time() < expiration: acquired_count = 0 locked_instances = [] # 尝试在每个Redis实例上获取锁 for master in self.masters: if master.set(resource_id, 'locked', nx=True, px=self.ttl): acquired_count += 1 locked_instances.append(master) # 如果获得超过半数以上的锁,则认为成功获取分布式锁 if acquired_count > self.quorum: return locked_instances # 清除已获取但未达到法定数量的锁 for master in locked_instances: master.delete(resource_id) # 等待一段时间后重试 time.sleep(0.1) raise Exception("Could not acquire lock") def unlock(self, resource_id, masters_with_lock): for master in masters_with_lock: # 使用Lua脚本保证原子性地删除锁(与前面简单实现中的释放锁方法类似) RELEASE_LOCK_SCRIPT = """ if redis.call("get", KEYS[1]) == "locked" then return redis.call("del", KEYS[1]) else return 0 end """ release_script = master.register_script(RELEASE_LOCK_SCRIPT) release_script(keys=[resource_id]) # 示例用法: masters = [('localhost', 6379, 0), ('localhost', 6380, 0)] # 假设有两个Redis实例 redlock = RedLock(masters, 10000) # 设置锁的有效期为10秒 resource_id = 'my_resource' try: masters_with_lock = redlock.lock(resource_id) print(f"Acquired Redlock on resource {resource_id}") # 执行临界区代码... finally: redlock.unlock(resource_id, masters_with_lock)
此示例仅为了说明Redlock算法的核心思想,并未涵盖所有边界条件和异常处理,如Redis实例宕机、网络延迟导致时间不一致等情况,请在生产环境中使用时结合实际情况进行优化和完善。
总结
通过以上介绍和示例代码,我们了解了如何基于Python与Redis实现不同层次复杂度的分布式锁。从简单的SETNX命令获取锁,到设置锁的超时时间以防止进程崩溃导致死锁,再到利用Lua脚本保证释放锁操作的原子性,以及针对大规模分布式环境考虑采用Redlock算法提高系统的容错性和安全性。这些方法可以根据实际业务场景选择合适的方式来实现分布式锁,从而有效解决多节点环境下对共享资源的竞争问题。随着技术的发展和需求的变化,分布式锁的实现方式也将持续演进和完善,为构建更稳定、高效的分布式系统提供有力支撑。
以上就是Python+Redis从零打造分布式锁实战示例的详细内容,更多关于Python Redis分布式锁的资料请关注脚本之家其它相关文章!