java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Redisson 可重入锁

揭秘springboot中Redisson 可重入锁的实现原理

作者:给我点时间看书

本文探究基于 Redisson 的可重入锁原理,通过使用 hash 数据结构 + Lua 脚本实现可重入的分布式锁,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

本文探究基于 Redisson 的可重入锁原理,通过 RedissonLock 类中的源码,学习如何使用 hash 数据结构 + Lua 脚本实现可重入的分布式锁。

可重入锁的业务场景

RLock lock = redissonClient.getLock("lock:business:");

void method1() {
    boolean isLock = lock.tryLock();
    if (!isLock) {
        log.error("方法 1 获取锁失败");
    }
    try {
        log.info("方法 1 获取锁成功");
        method2();
    } finally {
        log.info("方法 1 释放锁锁");
        lock.unlock();
    }
}

void method2() {
    boolean isLock = lock.tryLock();
    if (!isLock) {
        log.error("方法 2 获取锁失败");
    }
    try {
        log.info("方法 2 获取锁成功");
    } finally {
        log.info("方法 2 释放锁锁");
        lock.unlock();
    }
}

上面业务代码中,方法 1 获取锁之后,需要调用调用方法 2,方法 2 同样也需要获取锁保证线程安全,此时就需要重复获取同一个锁,这个锁就叫做可重入锁。

可重入锁的实现逻辑

分为获取锁和释放锁两部分来写

获取锁

  1. 锁存在,要判断锁标识是否是自己的线程,不是自己的线程,那就说明有别的线程已经获取到锁了,当前线程获取锁失败;如果是自己的线程标识,那就要给锁计数加一(记录获取锁的次数)并重置有效期;
  2. 锁不存在,就获取锁,添加当前线程的标识,获取锁次数加一,并设置有效期

释放锁

  1. 线程标识不是自己,锁可能过期释放了,返回释放失败
  2. 是自己,锁的计数减一,判断锁计数是否为 0,如果不为 0,重置锁有效期,执行下一段业务,如果锁计数为 0 了,就释放锁,此时流程结束。

源码分析

本文源码基于 JDK 17 + Redisson 3.39.0,只做可重入锁部分的源码探究,其余部分读者可自行深入。

依旧是分为获取锁和释放锁两部分探究,Redisson 底层使用 hash 数据结构 + Lua 脚本实现可重入的分布式锁。

在看源码之前,先看看 Redisson 源码包中与本文相关的类和接口的实现与继承关系,方便理解文章:

获取锁源码

在业务中我们使用 redissonClient 获取锁,从tryLock()方法开启 redisson 的可重入锁源码探索之旅。

// 注入 redissonClient 依赖
@Resource
private RedissonClient redissonClient;


// 业务中获取锁
RLock lock = redissonClient.getLock("lock:business");
boolean isLock = lock.tryLock();

tryLock()java.util.concurrent.locks包下的Lock接口中的方法,Redisson 对于其实现类有四个:

  1. RedissonLock 类
  2. RedissonFasterMultiLock 类
  3. RedissonMultiLock 类
  4. RedissonSpinLock 类

现在我们再深入看看 RedissonLock 类中的实现(如对其余实现类感兴趣,读者可自行阅读源码):

@Override
public boolean tryLock() {
    return get(tryLockAsync());
}

tryLock()方法是等待tryLockAsync()这个获取锁的异步操作完成,tryLockAsync()方法是org.redisson.RedissonBaseLock类中的一个方法,实现了org.redisson.api.RLockAsync接口的抽象方法。

@Override
public RFuture<Boolean> tryLockAsync() {
    return tryLockAsync(Thread.currentThread().getId());
}

tryLockAsync()方法则是调用org.redisson.api.RLockAsync接口中的重载方法,传递一个当前线程的 ID,接下来再看看 RedissonLock 这个类中,这个重载方法的具体的实现:

@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
    return getServiceManager().execute(() -> tryAcquireOnceAsync(-1, -1, null, threadId));
}

该方法内部调用了getServiceManager().execute(),异步执行tryAcquireOnceAsync()方法。

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    CompletionStage<Boolean> acquiredFuture;
    if (leaseTime > 0) {
        acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
        acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }

 // ......其余代码
    
}

可以看到在业务中如果使用无参的lock.tryLock()方法获取锁,那么给tryLockInnerAsync()方法传递的五个参数:

  1. 等待时间 waitTime 设置为 -1
  2. 超时时间默认 30 * 1000(RedissonLock 实例化的时候指定)
  3. 时间单位默认毫秒
  4. 第四个参数当前线程 ID
  5. 最后一个参数用来执行 Lua 脚本,并返回一个布尔值

接下来就是获取锁的核心代码:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, command,
            "if ((redis.call('exists', KEYS[1]) == 0) " +
                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

可以看到,Redisson 底层还是使用了 Lua 脚本来实现可重入锁,我们忽略其他代码,直接看 Lua 脚本做了什么事情:

参数解释:

  1. KEYS[1]:锁的 key
  2. ARGV[1]:过期时间
  3. ARGV[2]:线程 ID

可重入锁使用 Redis 的 hash 结构存储值,file 是线程 ID,value 是获取锁的次数(value 是 file 对应的值):

if ((redis.call('exists', KEYS[1]) == 0) 
    or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then 
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
return redis.call('pttl', KEYS[1]);
  1. 如果锁不存在或者锁存在且线程 ID 和当前线程的 ID 一致,将获取锁的次数自增 1,且重置过期时间,之后返回操作成功;
  2. 如果以上 if 条件不成立,返回锁的过期时间,锁不存在就会返回 -2。

释放锁源码

try {
    log.info("执行具体业务");
} finally {
    lock.unlock();
}

通过获取锁的源码分析,我们发现其实真正的核心代码就是那段 Lua 脚本,在此我们也不做过多的其余代码分析,直接来看核心代码:

protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
    return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "local val = redis.call('get', KEYS[3]); " +
                                "if val ~= false then " +
                                    "return tonumber(val);" +
                                "end; " +

                                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                                    "return nil;" +
                                "end; " +
                                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                                "if (counter > 0) then " +
                                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                                    "redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
                                    "return 0; " +
                                "else " +
                                    "redis.call('del', KEYS[1]); " +
                                    "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
                                    "redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
                                    "return 1; " +
                                "end; ",
                            Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
                            LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
                            getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}

其中 Redisson 底层释放锁源码依旧使用 Lua 脚本,接下来具体看看脚本都干了什么:
参数解释:

  1. KEYS[1]:锁对象的 key
  2. KEYS[2]:频道 key,发布所释放的消息
  3. KEYS[3]:闩锁 key,协调多个客户端同时解锁的情况
  4. ARGV[1]:发布的消息内容
  5. ARGV[2]:锁 key 的过期时间
  6. ARGV[3]:线程标识
  7. ARGV[4]:发布命令
  8. ARGV[5]:闩锁 key 过期时间
-- 第一段
local val = redis.call('get', KEYS[3]);
if val ~= false then 
    return tonumber(val);
end;

-- 第二段
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
    return nil;
end;

-- 第三段
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then 
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    redis.call('set', KEYS[3], 0, 'px', ARGV[5]);
    return 0; 
else 
    redis.call('del', KEYS[1]); 
    redis.call(ARGV[4], KEYS[2], ARGV[1]); 
    redis.call('set', KEYS[3], 1, 'px', ARGV[5]);
    return 1; 
end;

第一段代码是用来协调多个客户端同时解锁的请求,在本文过多涉及;

第二段代码,通过锁和线程标识判断是否是当前线程获取的锁,如果不是,返回解锁失败;

第三段代码,就是释放锁的核心代码了,执行到这里,说明锁存在且是当前线程持有的:

  1. 先把 获取锁的次数 减 1,返回更新后的 获取锁次数 。
  2. 如果次数大于 0,说明还有业务代码重入获取锁,此时重置锁的过期时间;并将闩锁 key 的值设置为 0,给一个过期时间确保其他线程在同一时刻尝试进行解锁操作;
  3. 如果次数小于等于 0,那就说明业务已经执行完毕,删除这个锁;发布消息同之其它可能等待此锁的客户端这个锁已经被释放;并将闩锁 key 的值设置为 1,给定过期时间。

总结

  1. 获取锁:每次重入,都会计数自增一;
  2. 释放锁:每次释放,都会计数减一,直到为 0,此时真正释放锁。

为什么要使用 Lua 脚本呢?

  1. Lua 脚本在 Redis 中是原子执行的,分布式环境中,不会被其它线程影响,从而保持操作的原子性和数据的一致性;
  2. 通过将复杂的逻辑封装到 Lua 脚本中,一次性执行,减少网络开销。

到此这篇关于揭秘springboot中Redisson 可重入锁的实现原理的文章就介绍到这了,更多相关Redisson 可重入锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文