java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Redisson分布式锁原理分析

Redisson之分布式锁原理全面分析

作者:Charge8

这篇文章主要介绍了Redisson分布式锁原理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

Redisson是一个 Redis的开源客户端,也提供了分布式锁的实现。

Redisson官网:

Redisson 分布式锁使用

Redisson分布式锁使用起来还是蛮简单的。

1、添加 Redisson 配置类

引入依赖:

		<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.16.5</version>
        </dependency>

创建 Redisson 配置类,注入 RedissonClient客户端。

@Configuration
public class RedissonConfig {

	@Value("${spring.redis.host}")
	private String host;

	@Value("${spring.redis.port}")
	private String port;

	@Value("${spring.redis.password}")
	private String password;

	@Bean
	public RedissonClient getRedisson() {
		Config config = new Config();
		/**
		 * reids配置,支持单机、主从、哨兵、集群等配置。这里使用单机配置
		 */
		config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
		return Redisson.create(config);
	}
}

2、使用 Redisson分布式锁

代码如下:

	@Autowired
	private RedisTemplate<String, String> redisTemplate;

	@Autowired
	private RedissonClient redissonClient;

	public void disLockDemo(long productId) {
		String lockKey = "DISTRIBUTE_LOCK:redissonLock:product_" + productId;

		//设置锁定资源名称,并获取分布式锁对象。
		RLock redissonLock = redissonClient.getLock(lockKey);
		//1.加锁
		redissonLock.lock();
		//boolean isLock = disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
		try {
			//2.执行业务代码
			// TODO
            
            //if (isLock) {
				// TODO
			//}
		} finally {
			//3.解锁
			redissonLock.unlock();
		}
	}

Redisson 分布式锁源码分析

使用分布式锁必须要考虑的一些问题:

Redisson 是 Redis 官方推荐分布式锁实现方案,它采用 Watch Dog机制能够很好的解决锁续期的问题。

执行 lua脚本保证了多条命令执行的原子性操作。

带着上面分布式锁的一些问题查看源码。

1、获取分布式锁对象

简单了解一下。

1.1 创建 RedissonClient

我们在配置类中通过 Redisson.create(config)方法创建了 RedissonClient对象,并注入到 IOC容器中。

1.2 获取分布式锁对象

使用 Redisson 客户端来 获取分布式锁对象。

2、加锁代码

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //线程id
        long threadId = Thread.currentThread().getId();
        // 1.尝试获取锁
        Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
        if (ttl != null) {
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);
            if (interruptibly) {
                this.commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                this.commandExecutor.syncSubscription(future);
            }

            try {
                //2.死循环,反复去调用tryAcquire尝试获取锁
                while(true) {
                    // 再次尝试获取锁
                    ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
                    // ttl为null时表示别的线程已经unlock了,自己加锁成功
                    if (ttl == null) {
                        return;
                    }
                    // 3.锁互斥:通过 JDK的信号量 Semaphore来阻塞线程
                    if (ttl >= 0L) {
                        try {                                                  
                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException var13) {
                            if (interruptibly) {
                                throw var13;
                            }

                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else if (interruptibly) {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquire();
                    } else {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
                    }
                }
            } finally {
                // 4.无论是否获得锁,都要取消订阅解锁消息
                this.unsubscribe(future, threadId);
            }
        }
    }

2.1 异步加锁机制

查看 tryAcquire()加锁方法。

通过源码,看到加锁其实是通过一段 lua 脚本实现的,如下:

if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end ;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end ;
return redis.call('pttl', KEYS[1]);

Redisson 实现分布式锁的共享资源的存储结构是 hash数据结构:

key 是锁的名称,field 是客户端 ID,value 是该客户端加锁(可重入)的次数。

假设此时,客户端 1 来尝试加锁,查看加锁的 lua 脚本:

第一段 if 判断语句,如果你要加锁的那个锁 key 不存在的话,进行加锁。此时锁 key不存在,向Redis中设置一个 hash 结构的数据,则客户端 1加锁成功,返回 null。

2.1.1 锁的续期机制

客户端 1 加锁的那个锁 key 默认生存时间才 30 秒,如果超过了 30 秒,客户端 1 还想一直持有这把锁,就需要提前进行锁的续期操作。

Redisson 提供了一个 Watch dog 机制来解决锁的续期问题, 只要客户端 1 一旦加锁成功,就会启动一个 Watch Dog。

进入 scheduleExpirationRenewal方法,重点查看 renewExpiration方法。

锁续期的 lua 脚本如下:

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end ;
return 0;

从源码我们看到 leaseTime 必须是 -1 才会开启 Watch Dog 机制,我们发现:

Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,

然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 1 还持有锁 key(判断客户端是否还持有 key,

其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,

如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。

如果服务宕机了,Watch Dog 机制线程也就没有了,此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。

2.1.2 可重入加锁机制

Redisson 也是支持可重入锁的,比如:客户端 1 加锁代码:

@Override
public void demo() {
    RLock lock = redissonSingle.getLock("myLock");
    try {
        lock.lock();
        // TODO 执行业务
        //锁重入
        lock.lock();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        // 释放锁
        lock.unlock();
        lock.unlock();
    }
}

此时,如果客户端 1 又来尝试加锁,继续分析加锁的 lua 脚本。

2.2 锁互斥机制

上面客户端 1加锁成功,此时,如果客户端 2 来尝试加锁,继续分析加锁的 lua 脚本:

接着查看 lock方法中的 死循环部分。

流程大致如下:

注意:

当锁正在被占用时,等待获取锁的进程并不是真正通过一个 while(true) 死循环去获取锁(占 CPU资源),而时使用 JDK 的信号量 Semaphore 来阻塞线程(间断性的不断尝试获取锁),是会释放 CPU资源的。

3、锁释放代码

    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise();
        // 1. 异步释放锁
        RFuture<Boolean> future = this.unlockInnerAsync(threadId);
        // 2. 取消 Watch Dog 机制
        future.onComplete((opStatus, e) -> {
            this.cancelExpirationRenewal(threadId);
            if (e != null) {
                result.tryFailure(e);
            } else if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
                result.tryFailure(cause);
            } else {
                result.trySuccess((Object)null);
            }
        });
        return result;
    }

3.1 异步释放锁机制

查看unlockInnerAsync方法。

释放锁也是执行的 lua 脚本:

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end ;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end ;
return nil;

首先,第一段 if 判断语句,判断 key 是否存在的话,进行加锁。此时锁 key不存在,则客户端 1加锁成功,向Redis中设置一个 hash 结构的数据。返回 null。

然后,第二个 if 判断,判断一下该客户端对应的锁的 hash 结构的 value 值是否递减为 0,

3.2 取消 Watch Dog机制

查看 cancelExpirationRenewal方法。

取消 Watch Dog 机制,即将 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的线程 id 删除。

3.3 通知阻塞等待的进程

利用 Redis 的发布订阅机制,广播释放锁的消息,通知阻塞等待的进程(向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息)。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文