Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis高并发超卖问题解决

Redis高并发超卖问题解决方案图文详解

作者:山河亦问安

Redis是一种基于内存的数据存储系统,被广泛用于解决高并发问题,下面这篇文章主要给大家介绍了关于Redis高并发超卖问题解决方案的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下

1. Redis高并发超卖问题解决方案

在高并发的秒杀抢购场景中,常常会面临一个称为“超卖”(Over-Selling)的问题。超卖指的是同一件商品被售出的数量超过了实际库存数量,导致库存出现负数。这是由于多个用户同时发起抢购请求,而系统未能有效地控制库存的并发访问。

下面进行一个秒杀购买某个商品的接口模拟,代码如下:

@RestController
public class MyController {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String key="product_" + id;
        int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
        if(count>0){
            stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
            System.out.println(key+"商品购买成功,剩余库存"+count);
            return "success";
        }
        System.out.println(key+"商品库存不足");
        return "error";
    }
}

上面的代码在高并发环境下容易出现超卖问题,使用JMeter进行压测,如下图:

 进行压测获得的日志如下图,存在并发安全问题。

 要解决上面的问题,我们一开始想到的是synchronized加锁,但是在 Redis 的高并发环境下,使用 Java 中的 synchronized关键字来解决超卖问题是行不通的,原因如下:

对于 Redis 高并发环境下的超卖问题,更合适的解决方案通常是使用 Redis 提供的分布式锁(如基于 Redis 的分布式锁实现)。这可以确保在分布式环境中的原子性和可靠性。

基于Redis的分布式锁,我们可以基于Redis中的Setnx(命令在指定的 key 不存在时,为 key 设置指定的值),更改代码如下:

@RestController
public class MyController {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String lock="product_lock_"+id;
        String key="product_" + id;
        Boolean lock1 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock");
        String message="error";
        if(!lock1){
            System.out.println("业务繁忙稍后再试");
            return "业务繁忙稍后再试";
        }
        //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
        try {
                int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
                if (count > 0) {
                    stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
                    System.out.println(key + "商品购买成功,剩余库存" + count);
                    message="success";
                }
        }catch (Throwable e){
            e.printStackTrace();
        }finally {
            stringRedisTemplate.delete(lock);
        }
        if(message.equals("error"))
        System.out.println(key+"商品库存不足");
        return message;
    }
}

然后使用JMeter压测,在10s内陆续发送500个请求,日志如下图,由图可以看出基本解决超卖问题。

1.1 高并发场景超卖bug解析

系统在达到finally块之前崩溃宕机,锁可能会一直存在于Redis中。这可能会导致其他进程或线程无法在未来获取该锁,从而导致资源被锁定,后续尝试访问该资源的操作可能被阻塞。因此在redis中给定 key设置过期时间。代码如下:

@RestController
public class MyController {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String lock="product_lock_"+id;
        String key="product_" + id;
        Boolean lock1 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock",10, TimeUnit.SECONDS); //保证原子性
//        Boolean lock2 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock");
//        stringRedisTemplate.expire(lock,10,TimeUnit.SECONDS); //此时宕机依旧会出现redis锁无法释放,应设置为原子操作
        String message="error";
        if(!lock1){
            System.out.println("业务繁忙稍后再试");
            return "业务繁忙稍后再试";
        }
        //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
        try {
                int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
                if (count > 0) {
                    stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
                    System.out.println(key + "商品购买成功,剩余库存" + count);
                    message="success";
                }
        }catch (Throwable e){
            e.printStackTrace();
        }finally {
            stringRedisTemplate.delete(lock);
        }
        if(message.equals("error"))
        System.out.println(key+"商品库存不足");
        return message;
    }
}

在高并发场景下,还存在一个问题,即业务执行时间过长可能导致 Redis 锁提前释放,并且误删除其他线程或进程持有的锁。这可能发生在以下情况:

修改代码如下:

@RestController
public class MyController {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String lock="product_lock_"+id;
        String key="product_" + id;
        String clientId=UUID.randomUUID().toString();
        Boolean lock1 = stringRedisTemplate.opsForValue().setIfAbsent(lock, clientId,10, TimeUnit.SECONDS); //保证原子性
//        Boolean lock2 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock");
//        stringRedisTemplate.expire(lock,10,TimeUnit.SECONDS); //此时宕机依旧会出现redis锁无法释放,应设置为原子操作
        String message="error";
        if(!lock1){
            System.out.println("业务繁忙稍后再试");
            return "业务繁忙稍后再试";
        }
        //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
        try {
                int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
                if (count > 0) {
                    stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
                    System.out.println(key + "商品购买成功,剩余库存" + count);
                    message="success";
                }
        }catch (Throwable e){
            e.printStackTrace();
        }finally {
            if (stringRedisTemplate.opsForValue().get(lock).equals(clientId))//在这里如果有别的业务代码并且耗时较长, stringRedisTemplate.delete(lock)之前还是有可能超过过期时间出现问题
                stringRedisTemplate.delete(lock);
        }
        if(message.equals("error"))
        System.out.println(key+"商品库存不足");
        return message;
    }
}

上面的代码在高并发场景下仍然存在概率很低的问题,所以就有了redisson分布式锁。

1.2 Redisson

Redisson 是一个用于 Java 的 Redis 客户端,它提供了丰富的功能,包括分布式锁。Redisson 的分布式锁实现了基于 Redis 的分布式锁,具有简单易用、可靠性高的特点。

以下是 Redisson 分布式锁的一些重要特性和用法:

导入依赖

       <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.23.5</version>
        </dependency>

application.yaml 配置:

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0
        max-wait: 1000ms

RedissonConfig配置:

@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;


    /**
     * RedissonClient,单机模式
     */
    @Bean
    public RedissonClient redisson() {
        Config config = new Config();
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress("redis://" + host + ":" + port);
        return Redisson.create(config);
    }
}

利用Redisson分布式锁解决超卖问题,修改代码如下:

加锁 lock.lock()

阻塞等待,默认等待,加锁的默认时间都是30s,锁的自动续期,如果业务时间长,运行期间会自动给锁续上新的30s,不用担心业务时间长导致锁自动过期被删除,加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s后自动删除。

加锁 lock.lock(10,TimeUnit.SECONDS)

锁到期后,不会自动续期,如果传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间。如果未指定锁的超时时间,只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s就会自动进行续期。

@RestController
public class MyController {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Autowired
    RedissonClient redisson;

    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String message="error";
        String lock_key="product_lock_"+id;
        String key="product_" + id;
        RLock lock = redisson.getLock(lock_key);
        //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
        try {
                lock.lock();
                int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
                if (count > 0) {
                    stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
                    System.out.println(key + "商品购买成功,剩余库存" + count);
                    message="success";
                }
        }catch (Throwable e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        if(message.equals("error"))
        System.out.println(key+"商品库存不足");
        return message;
    }
}

总结 

到此这篇关于Redis高并发超卖问题解决方案的文章就介绍到这了,更多相关Redis高并发超卖问题解决内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文