java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Redis分布式锁

Redis分布式锁介绍与使用

作者:扎哇太枣糕

服务器集群项目中的锁是无法精准的锁住线程资源的,于是我们就是需要使用分布式锁,分布式锁该如何使用又有什么注意点呢?就让我们进入接下来的学习

首先,使用idea模拟搭建一个tomcat服务器集群,并使用Nginx对集群中的服务器实现负载均衡

配置完负载均衡之后,发送两次请求就会在idea的运行窗口中发现,两次请求的运行是分别在两个服务器中完成,这就是集群的轮询机制

分布式锁

业务逻辑分析

  在单JVM虚拟机多线程执行的情况下,可以使用JVM内部的锁机制来控制多进程的并发执行,借此可以保证一个用户只能下一个优惠券订单。但是在分布式的情况下,每一个JVM虚拟机都有一个锁监视器,不同JVM里的不同线程之间的访问的并不是同一个锁监视器,所以说此时再使用synchronized锁就无法满足一个用户限买一单的业务情况了,于是就需要使用分布式锁

  分布式锁就是满足分布式系统或集群模式下多进程可见并且互斥的锁。一般实现分布式锁的技术主要就是MySQL、Redis和ZooKeeper,但是综合对比来看的话,Redis作分布式锁的性能更高一些,Redis是在JVM虚拟机之外的一种应用可以满足多线程都可见,互斥可以使用setnx这种的互斥命令来实现,但是使用Redis会存在安全性问题,如果Redis崩溃的话会导致锁无法释放而出现死锁现象,解决这一问题的方案就是使用TTL过期时间,就算崩溃也可以实现到期自动释放。

Redis命令

  使用Redis实现分布式锁的步骤主要就是使用setnx体现互斥锁,然后expire过期时间防止宕机死锁,但是如果服务在setnx之后expire之前宕机的话,依旧会造成死锁现象。于是我们可以使用以下命令在互斥的同时设置超时时间,这样的话即是在设置锁之后宕机,依旧可以凭借超时时间释放锁

SET lock thread NX EX ttl超时时间

代码实现

  将获取锁和释放锁业务抽取出来,使用接口和实现类来完成

public interface ILock {
    /**
     * 尝试获取锁
     * @param timeoutSec 锁的超时时间
     * @return 是否成功获取锁
     */
    boolean tryLock(long timeoutSec);
    /**
     * 释放锁
     */
    void unLock();
}
public class SimpleRedisLock implements ILock {
    private String name;
    /**
     *先获取StringRedisTemplate对象,才能使用代码操作Redis
     */
    private StringRedisTemplate stringRedisTemplate;
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取当前操作线程的标识
        long threadId = Thread.currentThread().getId();
        // 获取锁
        Boolean res = stringRedisTemplate.opsForValue()
                .setIfAbsent(RedisConstants.KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        // res是Boolean的包装类,返回结果的时候涉及到拆箱问题,有可能存在结果为null的情况,此时就需要返回结果与true的比较,避免了空指针风险
        return Boolean.TRUE.equals(res);
    }
    @Override
    public void unLock() {
        // 释放锁
        stringRedisTemplate.delete(RedisConstants.KEY_PREFIX + name);
    }
}

  定义了分布式锁的获取和释放,接下来就是在一人一单业务代码中将锁机制升级成多线程锁了,主要修改的代码为就是5~14行,由单体的synchronized锁改为使用自定义的Redis锁,并根据不同线程获取锁的不同结果定义了不同的业务

public Result secKillVoucher(Long voucherId) {
    // 单用户id(拦截器中做登录验证的用户id)
    Long userId = UserHolder.getUser().getId();
    // 创建锁对象
    SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
    // 获取锁
    boolean isLock = lock.tryLock(1200);
    // 判断是否获取锁成功
    if (!isLock) {
        // 获取锁失败,返回错误或者重试
        return Result.fail("不允许重复下单!" );
    }
    // 获取锁成功,继续下单的业务逻辑
    try {
        // 查询优惠券
        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
        // 获取时间 判断秒杀活动是否开始或者结束
        if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
            return Result.fail("活动暂未开始");
        } else if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
            return Result.fail("活动已经结束");
        }
        // 判断库存是否充足
        if (seckillVoucher.getStock() < 1) {
            return Result.fail("库存不足,活动结束");
        }
        // user_id和voucher_id联合查询订单数
        int count = query().eq("user_id", userId)
                .eq("voucher_id", voucherId)
                .count();
        // 订单数为1 就说明已经下过单了
        if (count > 0) {
            return Result.fail("您已经购买过该商品了");
        }
        // 扣减库存
        boolean update = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId)
                .gt("stock", 0)
                .update();
        if (!update) {
            return Result.fail("库存不足!");
        }
        // 创建订单 并返回id
        VoucherOrder order = new VoucherOrder();
        // 订单id(redis全局唯一id) 下单用户id(拦截器中做登录验证的用户id) 优惠券id(直接传过来的id)
        long orderId = generator.nextId("order");
        order.setId(orderId);
        order.setUserId(userId);
        order.setVoucherId(voucherId);
        save(order);
        return Result.ok(orderId);
    } finally {
        // 释放锁
        lock.unLock();
    }
}

分布式锁误删问题

问题原因分析

  这个问题出现在Redis锁设置的超时时间上,由于设置了超时时间,所以可能出现一下情况:即当线程1获取到锁之后执行下单业务,但是由于业务堵塞锁已经超出TTL时间自动释放;此时线程2趁机获取Redis锁成功执行下单业务,线程2的下单业务执行到一半时线程1完成下单使用del命令释放锁;此时线程1释放的是线程2的锁,于是现在锁又处于闲置状态,于是线程3来获取Redis锁成功执行下单业务;此时,一共有同一个用户的两个线程在同时操作

  为了解决以上出现的问题,需要在每次释放锁之前都通过锁的线程标识(Redis锁对应的值)判断一下是不是自己的锁,如果是就使用del命令释放锁,否则就不做操作。但是有一点值得注意,之前锁的线程标识使用的是线程的name,这样的话很容易就造成不同JVM虚拟机里的线程name冲突影响判断,于是可以使用UUID随机生成一组数字加上线程name作为线程的标识,这样更能确保唯一性

代码实现

  综上所述,一共有两处需要改进的地方,一个是使用UUID加线程name作为线程标识(主要修改的是获取锁方法加上UUID的获取),一个是在使用del释放锁之前判断一下是否是自己的锁

public static final String ID_PREFIX = UUID.randomUUID(true) + "-";
public boolean tryLock(long timeoutSec) {
    // 获取当前操作线程的标识
    String threadId = RedisConstants.ID_PREFIX + Thread.currentThread().getId();
    // 获取锁
    Boolean res = stringRedisTemplate.opsForValue()
            .setIfAbsent(RedisConstants.KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
    // res是Boolean的包装类,返回结果的时候涉及到拆箱问题,有可能存在结果为null的情况,此时就需要返回结果与true的比较,避免了空指针风险
    return Boolean.TRUE.equals(res);
}
public void unLock() {
    // 获取当前操作线程的标识
    String threadId = RedisConstants.ID_PREFIX + Thread.currentThread().getId();
    // 通过锁名 获取redis中存储的锁对应的标识
    String rid = stringRedisTemplate.opsForValue().get(RedisConstants.KEY_PREFIX + name);
    if (threadId.equals(rid)) {
        // 释放锁
        stringRedisTemplate.delete(RedisConstants.KEY_PREFIX + name);
    }
}

Lua脚本

  Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:传送门

使用Redis命令调用脚本的常见命令可以是:

EVAL “redis.call(‘set’, ‘key’, ‘value’)” num

  上述命令解释为EVAL是调用,后面双引号中就是所调用的脚本语句,而最后的num即脚本语句中的KEYS类型参数的个数,num之外的就是ARGV(value)类型的参数。比如说,接下来这一个语句就代表着:setname为Rose,其中KEYS类型的参数有1个,就是num后面的第一个name,剩下的都是ARGV(value)类型的数据,其中调用的是KEYS[1]和ARGV[2],也就是name和Rose

EVAL “redis.call(‘set’, ‘KEYS[1]’, ‘ARGV[2]’)” 1 name age Rose

到此这篇关于Redis分布式锁介绍与使用的文章就介绍到这了,更多相关Redis分布式锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文