Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > redis和redission分布式锁原理

redis和redission分布式锁原理及区别说明

作者:qq_35478580

文章对比了synchronized、乐观锁、Redis分布式锁及Redission锁的原理与区别,指出在集群环境下synchronized失效,乐观锁存在数据库性能瓶颈,而Redission通过watchdog自动续期和Lua原子操作解决Redis锁的超时问题,推荐其在高并发场景下的可靠性与易用性

redis和redission分布式锁原理及区别

我最近做租车项目,在处理分布式时用到分布式锁,我发现很多同事都在网上找分布式锁的资料,但是看的资料都不是很全,所以在这里我谈谈自己的分布式锁理解。

结合我的其中某一业务需求:多个用户在同一个区域内发现只有一辆可租的车,最终结果肯定只有一位用户租车成功,这就产生了多线程(多个用户)抢同一资源的问题。

1、有的同伴想到了synchronized关键字锁

暂且抛开性能问题,项目为了高可用,都会做集群部署,那么synchronized就失去了加锁的意义,这里多嘴解释一下:

2、有的小伙伴可能想到了乐观锁

没错!!乐观锁可以解决的我的问题,但是在高并发的场景,频繁的操作数据库,数据库的资源是很珍贵的,并且还存在性能的问题。但是我这里简单说下乐观锁的使用:

select * from u_car where car_id = 10;
update u_car set status = 2,version = version +1 where car_id = 10 and version = 1

在修改的时候将version作为参数,如果其他用户锁车,那么version已经发生变化(version = version +1),所以version = 1不成立,修改失败

乐观锁不是本次的终点,但还是简单说下;

3、使用redis的分布式锁

	public boolean lock(String key, V v, int expireTime){
           //获取锁
           //在redis早期版本中,设置key和key的存活时间是分开的,设置key成功,但是设置存活时间时服务宕机,那么你的key就永远不会过期,有BUG
           //后来redis将加锁和设置时间用同一个命令
           //这里是重点,redis.setNx(key,value,time)方法是原子性的,设置key成功说明锁车成功,如果失败说明该车被别人租了
         boolean b = false;
         try {
         	b = redis.setNx(key, v, expireTime);
         } catch (Exception e) {
        	log.error(e.getMessage(), e);
    	 }
    	 return b;
    }
    public boolean unlock(String key){
        return redis.delete(key);
    }
}

但是这样写还是存在BUG的,我的key设置了加锁时间为5秒,但是我的业务逻辑5秒还没有执行完成,key过期了,那么其他用户执行redis.setNx(key, v, expireTime)时就成功了,将该车锁定,又产生了抢资源;我们想一下,如果我能够在业务逻辑没有执行完的时候,让锁过期后能够延长锁的时间,是不是就解决了上面的BUG;

实现这个锁的延长,非要自己动手的话就得另启一个线程来监听我们的业务线程,每隔1秒监测当前业务线程是否执行完成,如果没有就获取key的存活时间,时间小于一个阈值时,就自动给key设置N秒;当然,我们可以不用自己动手,redission已经帮我们实现key的时间时间过期问题;

4、使用redission的分布式锁

//引入依赖
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.10.6</version>
        </dependency>

redisson支持单点、集群等模式,这里选择单点的。

spring:  
    redis:
        host: 127.0.0.1
        port: 6379
        password: 
@Configuration
public class RedisConfig {
    @Value("${spring.redis.host}")
    private String host;
 
    @Bean(name = {"redisTemplate", "stringRedisTemplate"})
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate redisTemplate = new StringRedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }
 
    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":6379");
        return (Redisson) Redisson.create(config);
    }
 
}
private Logger log = LoggerFactory.getLogger(getClass());
@Resource
private Redisson redisson;
//加锁
public Boolean lock(String key,long waitTime,long leaseTime){
	Boolean  b = false;
	try {
        RLock rLock = redisson.getLock(key);
        //说下参数 waitTime:锁的存活时间 leaseTime:锁的延长时间 后面的参数是单位
        b = rLock.tryLock(waitTime,leaseTime,TimeUnit.SECONDS);
      } catch (Exception e) {
         log.error(e.getMessage(), e);
      } 
    }
    return b;
}
//释放锁
public void unlock(String key){
	try {
		RLock rLock = redisson.getLock(key);
		if(null!=lock){
			lock.unlock();
			lock.forceUnlock();
			fileLog.info("unlock succesed");
    	}
    } catch (Exception e) {
        fileLog.error(e.getMessage(), e);
    }
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        //尝试获取锁,如果没取到锁,则获取锁的剩余超时时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        //如果waitTime已经超时了,就返回false
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }
 
        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
            //进入死循环,反复去调用tryAcquire尝试获取锁,ttl为null时就是别的线程已经unlock了
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
 
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
 
                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
 
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

可以看到,其中主要的逻辑就是尝试加锁,成功了就返回true,失败了就进入死循环反复去尝试加锁。中途还有一些超时的判断。逻辑还是比较简单的。

上面的lua(俗称胶水语言)脚本比较重要,主要是为了执行命令的原子性解释一下:

第一段if就是判断你的key是否存在,如果不存在,就执行redis call(hset key ARGV[2],1)加锁和设置redis call(pexpire key ARGV[1])存活时间;

当第二个客户来加锁时,第一个if判断已存在key,就执行第二个if判断key的hash是否存在客户端2的ID,很明显不是;

则进入到最后的return返回该key的剩余存活时间

当加锁成功后会在后台启动一个watch dog(看门狗)线程,key的默认存活时间为30秒,则watch dog每隔10秒钟就会检查一下客户端1是否还持有该锁,如果持有,就会不断的延长锁key的存活时间

所以这里建议大家在设置key的存活时间时,最好大于10秒,延续时间也大于等于10秒

所以,总体流程应该是这样的。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文