Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis 分布式锁

从原理到实践分析 Redis 分布式锁的多种实现方案

作者:Ascend1797

在分布式系统中,为了保证多个进程或线程之间的数据一致性和正确性,需要使用锁来实现互斥访问共享资源,然而,使用本地锁在分布式系统中存在问题,这篇文章主要介绍了从原理到实践分析 Redis 分布式锁的多种实现方案,需要的朋友可以参考下

一、为什么要用分布式锁

在分布式系统中,为了保证多个进程或线程之间的数据一致性和正确性,需要使用锁来实现互斥访问共享资源。然而,使用本地锁在分布式系统中存在问题。

本地锁的问题

        比如商品服务A和服务B同时获取到库存数量为10的商品信息。商品服务A和服务B同时进行扣减库存操作,分别将库存数量减少了1。商品服务A和服务B均修改了库存数量为9,然后将数据写入数据库中。
        由于使用本地锁,商品服务A和服务B之间没有进行协调,因此就会出现数据不一致的问题。可能出现以下情况:商品服务A先将库存数量9写入数据库,然后商品服务B也将库存数量9写入数据库,商品服务B先将库存数量9写入数据库,然后商品服务A也将库存数量9写入数据库,结果,整个系统中库存数量实际只完成了一次扣减,最终库存数量卖出2份后,还剩下9,出现了数据不一致的情况。

        相比之下,分布式锁可以解决上述问题。分布式锁可以在多个节点之间协调锁的使用,确保在分布式系统中多个进程或线程互斥访问共享资源,并保证了全局唯一性,避免了死锁和锁竞争问题,同时也能够提高系统的吞吐量和性能。

二、什么是分布式锁

        分布式锁是一种用于在分布式系统中协调多个进程或线程之间对共享资源的互斥访问的机制。在分布式系统中,由于各个节点之间没有共享内存,因此无法使用传统的本地锁机制来实现进程或线程的同步,所以需要使用分布式锁来解决这个问题。

        举一个生活中的例子,假设我们去乘坐高铁,首先要进行检票进站,但有很多人都想进站。为了避免大家同时挤进去,高铁站会设置检票闸机,每次只允许一人检票通过,当有人检票进入时,其他人必须等待,直到检票成功进入后,闸机会再次反锁。后面的人再尝试检票获取检票闸机的进入权。这里的检票闸机就是高铁站的一把锁。

 来看下分布式锁的基本原理,如下图所示:

我们来分析下上图的分布式锁:

大白话解释:所有请求的线程都去同一个地方“占坑”,如果有坑位,就执行业务逻辑,没有坑位,就需要其他线程释放“坑位”。这个坑位是所有线程可见的,可以把这个坑位放到 Redis 缓存或者数据库,这篇讲的就是如何用 Redis 做“分布式坑位”

分布式锁的好处

三、Redis 的 SETNX 

        为了使用分布式锁,需要我们找到一个可靠的第三方中间件。Redis刚好可以用来作为分布式锁的提供者。

主要原因在于 Redis 具有以下特点:

高性能:Redis 是一种内存数据库,数据存储在内存中,读写速度非常快,可以快速响应锁的获取和释放请求。

原子操作:Redis 支持原子操作,例如 SETNX(SET if Not eXists)命令可以实现“只有在键不存在时设置键值”的操作,可以保证同时只会有一个客户端成功获取到锁,并且避免了因为执行多个操作而导致的竞态条件问题。

可靠性高:Redis 可以进行主从复制和持久化备份等操作,可以确保即使出现网络中断或 Redis 实例宕机的情况,也可以保证分布式锁的正确性和一致性。

        基于以上特点,我们可以使用 Redis 来实现分布式锁的机制。具体做法是通过 SETNX 命令在 Redis 中创建一个键值对作为锁,当有其他客户端尝试获取锁时,如果该键值对已经存在,则表示锁已经被其他客户端持有;反之,则表示当前客户端获取锁成功。

        Redis 中的 SETNX 命令用于设置指定键的值,但是只有在该键不存在时才进行设置。如果该键已经存在,则 SETNX 命令不会对其进行任何操作。

SETNX 的语法如下:

SETNX key value

SETNX 的源码实现比较简单,其实现过程如下:

SETNX 命令的 C 语言实现如下:

void setnxCommand(client *c) {
    robj *o;
    int nx = c->argc == 3; /* 如果参数个数为 3,说明设置 NX(key 不存在才设置) */
    long long expire = 0; /* 默认不设置过期时间 */
    int retval;
    if (nx) {
        /* NX 模式下检查 key 是否已经存在 */
        if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
            addReply(c,shared.czero);
            return;
        }
    } else {
        /* XX 模式下检查 key 是否不存在 */
        if (lookupKeyWrite(c->db,c->argv[1]) == NULL) {
            addReply(c,shared.czero);
            return;
        }
    }
    /* 尝试将字符串型或整型数字转换为 long long 型数字 */
    if (getTimeoutFromObjectOrReply(c,c->argv[3],&expire,UNIT_SECONDS)
        != C_OK) return;
    /* 值为空则返回错误 */
    if (checkStringLength(c,c->argv[2]->ptr,sdslen(c->argv[2]->ptr)) != C_OK)
        return;
    /* 尝试将键值对插入到数据库中 */
    o = createStringObject(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
    retval = dictAdd(c->db->dict,c->argv[1],o);
    if (retval == DICT_OK) {
        incrRefCount(o);
        /* 设置过期时间 */
        if (expire) setExpire(c->db,c->argv[1],mstime()+expire);
        server.dirty++;
        addReply(c, shared.cone);
    } else {
        decrRefCount(o);
        addReply(c, shared.czero);
    }
}

从源码实现可以看出,SETNX 命令的执行过程非常快速,由于 Redis 存储数据是采用字典结构,在判断 key 是否存在时可以达到 O(1) 的时间复杂度,因此 SETNX 命令的性能很高。

四、使用Redis SETNX 实现分布式锁的方案

SETNX 方案流程图

如上图所示,使用 Redis 的 SETNX 命令来实现分布式锁的过程如下:

代码示例

@Service
public class ProductService {
    private final RedisTemplate<String, String> redisTemplate;
    @Autowired
    public ProductService(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    /**
     * 扣减库存
     *
     * @param productId 商品ID
     * @param quantity  数量
     * @return true 扣减成功,false 扣减失败
     */
    public boolean decreaseStock(String productId, int quantity) {
        String lockKey = "stock_" + productId;
        while (true) {
            Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, "", 10, TimeUnit.SECONDS);
            if (lockResult != null && lockResult) {
                try {
                    String stockKey = "product_" + productId;
                    String stockStr = redisTemplate.opsForValue().get(stockKey);
                    if (StringUtils.isEmpty(stockStr)) {
                        // 库存不存在或已过期
                        return false;
                    }
                    int stock = Integer.parseInt(stockStr);
                    if (stock < quantity) {
                        // 库存不足
                        return false;
                    }
                    int newStock = stock - quantity;
                    redisTemplate.opsForValue().set(stockKey, String.valueOf(newStock));
                    return true;
                } finally {
                    redisTemplate.delete(lockKey);
                }
            }
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

         在 decreaseStock() 方法中,首先定义了一个 lockKey,用于对商品库存进行加锁。进入 while 循环后,使用 Redis 的 setIfAbsent() 方法尝试获取锁,如果返回值为 true,则表示成功获取锁。在成功获取锁后,再从 Redis 中获取商品库存,判断库存是否充足,如果充足则扣减库存并返回 true;否则直接返回 false。最后,在 finally 块中删除加锁的 key。

        如果获取锁失败,则等待 10 秒后再次尝试获取锁,直到获取成功为止。

SETNX 实现分布式锁的缺陷

使用 Redis SETNX 实现分布式锁可能存在以下缺陷:

五、Redis SETNX优化方案 SETNXEX

针对使用 Redis SETNX 实现分布式锁可能出现死锁的情况,,可以使用SETNXEX进行优化,Redis SETNXEX 命令是 Redis 提供的一个原子操作指令,用于设置一个有过期时间的字符串类型键值对,当且仅当该键不存在时设置成功,返回 1,否则返回 0。SETNXEX 命令的语法如下:

SETNXEX key seconds value

其中,key 是键名;seconds 为整数,表示键值对的过期时间(单位为秒);value 是键值。

源码分析:

实现 SETNXEX 命令的关键在于如何保证该操作的原子性和一致性。其实现过程如下:

Redis 在底层使用 SETNX 和 SETEX 命令实现 SETNXEX 命令,它的 C 语言实现代码如下:

void setnxexCommand(client *c) {
    robj *key = c->argv[1], *val = c->argv[3];
    long long expire = strtoll(c->argv[2]->ptr,NULL,10);
    expire *= 1000;
    if (getExpire(c,key) != -1) {
        addReply(c, shared.czero);
        return;
    }
    setKey(c,c->db,key,val,LOOKUP_NOTOUCH|LOOKUP_EX|LOOKUP_NX,0,0,NULL);
    if (c->flags & CLIENT_MULTI) {
        addReply(c, shared.cone);
        return;
    }
    server.dirty++;
    if (expire) setExpire(c,c->db,key,mstime()+expire);
    addReply(c, shared.cone);
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
}

        在这个代码中,首先从客户端传来的参数中获取 key、value 和 expire 值,并通过 getExpire 函数检查键是否已经存在。如果已经存在,则返回 0;否则,调用 setKey 函数将键值对设置为 value,并加上过期时间 expire。当然,这里的过期时间是以毫秒为单位的,需要转换成 Redis 的标准格式。最后,通过 addReply 函数向客户端发送成功的响应消息,并通过 notifyKeyspaceEvent 函数发送键空间通知。 

        需要注意的是,虽然 SETNXEX 被称为“原子操作”,但实际上在高并发场景下,SETNX 和 SETEX 操作之间可能会发生竞争问题,导致 SETNX 和 SETEX 操作不具备原子性。如果在分布式场景下需要保证 SETNXEX 的原子性,还需要使用分布式锁等机制来避免竞争问题。因此,在使用 SETNXEX 命令时,需要根据具体情况,评估其安全性和可靠性,采用合适的解决方案。

六、使用Redis SETNXEX 实现分布式锁的方案

SETNXEX 方案流程图

如上图所示,使用 Redis 的 SETNXEX 命令来实现分布式锁的过程如下

 代码示例

@Component
public class StockService {
    private final Logger logger = LoggerFactory.getLogger(StockService.class);
    private final String LOCK_KEY_PREFIX = "stock:lock:";
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    /**
     * 扣减库存
     * @param productId 商品ID
     * @param num 扣减数量
     */
    public boolean reduceStock(Long productId, int num) {
        // 构造锁的key
        String lockKey = LOCK_KEY_PREFIX + productId;
        // 构造锁的value,这里使用当前线程的ID
        String lockValue = String.valueOf(Thread.currentThread().getId());
        try {
            // 尝试获取锁,设置过期时间为10秒
            Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10L, TimeUnit.SECONDS);
            if (!locked) {
                // 获取锁失败,等待10秒后重新尝试获取锁
                Thread.sleep(10000);
                return reduceStock(productId, num);
            }
            // 获取锁成功,执行扣减库存代码
            // TODO ... 扣减库存代码
            return true;
        } catch (InterruptedException e) {
            logger.error("Failed to acquire stock lock", e);
            return false;
        } finally {
            // 释放锁
            if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {
                redisTemplate.delete(lockKey);
            }
        }
    }
}

        上述代码中,首先构造了锁的key和value,然后使用 RedisTemplate 的 setIfAbsent 方法尝试获取锁。如果获取锁失败,则线程会等待10秒后重新尝试获取锁,直到获取锁成功为止。如果获取锁成功,则执行扣减库存的业务逻辑,待操作完成后释放锁。

SETNXEX 实现分布式锁的缺陷 

七、Redis SETNXEX 实现分布式锁缺陷的优化方案

        针对SETNXEX锁过期问题的优化方案:在执行业务逻辑前,我们设置锁的过期时间为 30 秒,并启动一个定时任务续租锁,以防止锁因长时间持有而超时失效。
在 finally 块中释放锁,首先判断当前线程是否持有该锁,如果是则删除该锁。

SETNXEX 优化方案流程图

如上图所示,当有两个线程同时请求获取锁时,执行流程如下:

 代码示例

@Component
public class StockService {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    /**
     * 扣减库存
     *
     * @param stockId 库存 ID
     * @param num 扣减数量
     * @return 是否扣减成功
     */
    public boolean reduceStock(String stockId, int num) throws InterruptedException {
        // 构造锁的名称
        String lockKey = "stock_lock_" + stockId;
        // 获取当前线程 ID
        String threadId = String.valueOf(Thread.currentThread().getId());
        try {
            // 使用 SETNXEX 命令申请锁
            Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, threadId, 30, TimeUnit.SECONDS);
            if (!lockResult) {
                // 如果获取锁失败,则等待一段时间后重试
                Thread.sleep(10000);
                return reduceStock(stockId, num);
            }
            // 设置锁的过期时间为 30 秒,并启动一个定时任务续租锁
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                Long expireResult = redisTemplate.getExpire(lockKey);
                if (expireResult < 10) {
                    redisTemplate.expire(lockKey, expireResult + 10, TimeUnit.SECONDS);
                }
            }, 10, 10, TimeUnit.SECONDS);
            // TODO:执行业务逻辑,例如扣减库存
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放锁
            if (threadId.equals(redisTemplate.opsForValue().get(lockKey))) {
                redisTemplate.delete(lockKey);
            }
        }
        return false;
    }
}

        在上面的代码示例中,我们使用了 redisTemplateopsForValue().setIfAbsent() 方法来申请锁。如果获取锁失败,则等待 10 秒后重新尝试获取锁。在获取锁成功后,我们设置锁的过期时间为 30 秒,并启动一个定时任务续租锁,以防止锁因长时间持有而超时失效。在执行完业务逻辑后,返回 true 表示扣减成功。

        在释放锁时,我们首先通过 redisTemplate.opsForValue().get(lockKey) 方法获取当前持有锁的线程 ID,然后判断当前线程是否持有该锁,如果是则删除该锁。这里使用了 redisTemplate.delete() 方法来删除锁。

方案的缺陷

针对上述问题的解决方案 ,下篇见。

到此这篇关于从原理到实践分析 Redis 分布式锁的多种实现方案的文章就介绍到这了,更多相关 Redis 分布式锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文