Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis setnx、getset、incr

Redis实现分布式锁(setnx、getset、incr)以及如何处理超时情况

作者:九十三大人

本文主要介绍了Redis实现分布式锁(setnx、getset、incr)以及如何处理超时情况,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

如果你通过网络搜索分布式锁,最多的就是基于redis的了。基于redis的分布式锁得益于redis的单线程执行机制,单线程在执行上就保证了指令的顺序化,所以很大程度上降低了开发人员的思考设计成本。

一、通过setnx实现

1、setnx key value

当且仅当key不存在,将key的值设置为value,并且返回1;若是给定的key已经存在,则setnx不做任何动作,返回0。

public static Boolean setnx(final String key, final String value, final long seconds) {
        return getShardedJedisClient().execute(new ShardedJedisAction<Boolean>() {
            public Boolean doAction(ShardedJedis shardedJedis) {
                Jedis jedis = (Jedis) shardedJedis.getShard(key);
                String result = jedis.set(key, value, "NX", "EX", seconds);
                return "OK".equalsIgnoreCase(result);
            }
        });
    }

2、get key

获取key对应的value值,如果不存在该key,返回0。

public String get(final String key) {
        this.checkIsInMulti();
        return (String)this.execute(new SmartJedis.Action<String>() {
            public String doAction(Jedis jedis) {
                return jedis.get(key);
            }
        }, SmartJedis.RW.R, key);
    }

3、getset key value

获取key的旧值,将新value放入

public static String getset(final String key, final String value) {
        return getShardedJedisClient().execute(new ShardedJedisAction<String>() {
            @Override
            public String doAction(ShardedJedis shardedJedis) {
                return shardedJedis.getSet(key, value);
            }
        });
    }

至此,我们先举个手机三要素验证的列子:(A渠道系统,业务B系统,外部厂商C系统)
(1)B业务系统调用A渠道系统,验证传入的手机、身份证、号码三要素是否一一致。
(2)A渠道系统再调用外部厂商C系统。
(3)A渠道系统将结果返回给B业务系统。
这3个过程中,(2)过程,外部厂商的调用时是需要计费的。
当B业务系统并发量很高时,有100笔相同的三要素校验,由于是相同的三要素,A渠道只要调用一次厂商即可知道结果。那么A渠道系统如何控制不让100笔请求全部去访问外部厂商C系统呢?

小明提出了方案一:

在A系统中,
当100个线程同时请求过来,进行redis.setnx(“LOCK_KEY_phone&idNo&name”,”demo”),这样第一笔线程率先拿到锁,其他的线程等待,当thread(0)处理结束后,thread(0)进行delete(“LOCK_KEY_phone&idNo&name”),把锁放开,thread(i)进行get(“LOCK_KEY_phone&idNo&name”)拿到0,说明上一笔已经处理完成,这个时候,我们可以去查询上一笔的记录。

RedisUtils.setnx("LOCK_KEY_phone&idNo&name","demo");
JSONObject result = A.request(B);
AssetUtils.notNull(result,ResponseCodeEnum.Success,"拿到结果");
ResultDmo resultDmo = (ResultDmo)BeanUtils.maptoBean(result);
resultDao.insert(resultDmo);
if(result!=0){
  //上一笔同样的请求还未处理完成,轮训等待(具体如何轮训在此不展开)
}else{
  //上一笔同样的请求处理完成,进行查库操作
  resultDao.select("参数");
}

小宏说:小明的思想不严谨

问题:当100笔线程中一些线程超时或者系统宕机等意外情况发现,锁会一直被某些线程持有,造成死锁状态。
应该给缓存key设置一个超时时间。比如:200ms

RedisUtils.setnx("LOCK_KEY_phone&idNo&name","demo",200);

这种情况是,大致判断了外部厂商C系统业务处理时间大概为200ms,

网上看还有一种方式(B):

RedisUtils.setnx("LOCK_KEY_phone&idNo&name",currentTime,200);
Long old = RedisUtils.get("LOCK_KEY_phone&idNo&name");
Long new = System.currentTimeMillis();
Long time = new - old;
if(time>0){
//处理已经超时
RedisUtils.delete("LOCK_KEY_phone&idNo&name");
}

(B)这种情况不严谨:当a获取setnx锁,a线程崩溃或超时,b、c线程同时get到old,且判断超时,可能出现b线程delete a线程的锁,并且setnx后;c线程又将b线程的锁delete,并且setnx。这种情况完全锁不住线程了。

(B)方案的升级版—->>(C)方案:

当a获取setnx锁,a线程崩溃或超时,b线程getset,获取old且判断超时,c线程getset,获取old(此时这个值是b刚刚set进去的),判断未超时,c继续等待。b线程delete a线程的锁,并且setnx后。这种情况是安全的。

需要注意的地方:
①不要轻易将get和getset混用,笔者认为getset单独使用比较好。
有一种情况,a、b、c、三个线程,a、b同时get,a立即返回了old,突然来了个c,卡在b之前getset了,且删除锁,那么b的get只能返回nil了。此时再根据时间戳对比:
a.get != (a.set)
b.get ! = (b.set)
这样a、b都没拿到锁,但是a其实已经获取到了锁。
②多个服务器时间的同步问题。

总结: 锁超时了该如何处理,通过getset方式判断时间戳差的方式,多比同时getset都得到超时,同时去setnx。总会有一个更快地去setnx。

二、通过incr抢占资源实现

1、incr

将 key 中储存的数字值增一。如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作。如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。

public static Long incr(final String key) {
        return shardedClient.execute(new ShardedJedisAction<Long>() {
            @Override
            public Long doAction(ShardedJedis shardedJedis) {
                shardedJedis.expire(key, 200);
                return shardedJedis.incr(key);
            }
        });
    }

还是上面的三要素的例子

Long result = RedisUtils.incr("LOCK_KEY_phone&idNo&name");
        if (result > 1) {
            //如果计数器>1,说明已经有请求进来
            throw new AppException(ResponseCode.FAIL.getCode(), "操作频繁");
        }


 JSONObject result = A.request(B);
 Long endTime = System.currentTimeMillis();
 Long time = endTime - startTime;
  //如果处理时间大于incr的key存活时间,说明该笔请求已经超时
  if (time > 200) {
      //全局ID,统计超时次数
      String key = "LOCK_KEY_phone&idNo&name" + source;
      RedisUtils.incr(key);
      int total = Integer.valueOf(RedisUtils.get(key));
      //断言若超时10次,进行报警(报警不在次展开)
      AssertUtils.isTrue(total < 10, ResponseCode.FAIL, "调用" + source + "渠道超时");
  }

这里设置了计数器的超时时间为200ms,如果请求超时,会有大量的线程同时访问,笔者这里有10笔同时过来,就启动报警。人为排查渠道。和setnx的不同是,某个线程超时,setnx的方式需要手动去判断,再去加锁,防止大量线程进入(这里可以通过轮训实现);而incr的方式超时了,大量线程进来,我不做处理,但是这里的time>200是具有误差的。

到此这篇关于Redis实现分布式锁(setnx、getset、incr)以及如何处理超时情况的文章就介绍到这了,更多相关Redis setnx、getset、incr内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文