Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis异步秒杀

Redis消息队列实现异步秒杀功能

作者:starrismq

在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给 Redis 处理,并通过异步方式执行,Redis 提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Redis消息队列实现异步秒杀功能,感兴趣的朋友一起看看吧

1 Redis消息队列

在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给 Redis 处理,并通过异步方式执行。Redis 提供了多种数据结构来实现消息队列,总结三种。

1.1 List 结构

1.2 Pub/Sub 模式

1.3 Stream 结构

1.4 Redis Stream消息队列的特点

Redis 5.0引入的Stream类型是专门为消息队列设计的,支持以下特性:

2 秒杀业务处理

2.1 使用Lua脚本处理库存和订单

目标:在Redis中完成库存判断和订单校验,确保原子性。

-- 参数:优惠券ID、用户ID、订单ID
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
-- 库存Key和订单Key
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
-- 判断库存是否充足
if (tonumber(redis.call('GET', stockKey)) <= 0 then
    return 1 -- 库存不足
end
-- 判断用户是否已下单
if (redis.call('SISMEMBER', orderKey, userId) == 1 then
    return 2 -- 用户已下单
end
-- 扣减库存并记录订单
redis.call('DECR', stockKey)
redis.call('SADD', orderKey, userId)
-- 将订单信息发送到消息队列
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0 -- 成功

脚本说明

 2.2 创建消费者组

g1:消费者组名称。

MKSTREAM:如果队列不存在则自动创建。

2.3 Java代码实现

系统启动与初始化

系统启动时,VoucherOrderServiceImpl 类的 @PostConstruct 注解会触发 init 方法执行。该方法先加载创建消息队列的 Lua 脚本,通过 stringRedisTemplate.execute 方法执行脚本创建 Redis Stream 消息队列和消费者组。若创建成功或队列已存在,会记录相应日志。之后,使用线程池 SECKILL_ORDER_EXECUTOR 启动 VoucherOrderHandler 线程,该线程负责后续从消息队列获取订单信息并处理。

用户发起秒杀请求

用户发起秒杀请求后,系统调用 VoucherOrderServiceImplseckillVoucher 方法。此方法先从 ThreadLocalUtls 中获取用户 ID,用 redisIdWorker 生成订单 ID。接着执行判断用户秒杀资格的 Lua 脚本,该脚本接收优惠券 ID、用户 ID 和订单 ID 作为参数。若脚本返回值表明库存不足或用户已下单,方法返回相应的失败提示;若返回值为 0,说明用户有秒杀资格,创建代理对象并返回下单成功结果。

Lua 脚本执行逻辑

Lua 脚本接收到参数后,根据优惠券 ID 拼接库存和订单的 Redis key。先通过 GET 命令获取库存,若库存小于等于 0 则返回 1 表示库存不足。若库存充足,使用 SISMEMBER 命令检查用户是否已下单,若已下单则返回 2。若库存充足且用户未下单,使用 INCRBY 命令扣减库存,SADD 命令记录订单信息,最后返回 0 表示下单成功。

订单处理线程工作

VoucherOrderHandler 线程启动后进入无限循环,不断从 Redis Stream 消息队列获取订单信息。若未获取到消息,继续下一次循环;若获取到消息,将消息转换为 VoucherOrder 对象,调用 handleVoucherOrder 方法处理订单,处理完成后向消息队列发送 ACK 确认消息。若处理过程中出现异常,调用 handlePendingList 方法处理异常消息。

订单处理方法 handleVoucherOrder

handleVoucherOrder 方法接收 VoucherOrder 对象,根据用户 ID 获取 Redisson 分布式锁。尝试获取锁,若失败记录错误日志并返回;若成功,调用代理对象的 createVoucherOrder 方法创建订单,最后释放锁。

订单创建方法 createVoucherOrder

该方法先判断当前用户是否是第一单,通过查询数据库中该用户的订单数量来判断。若不是第一单,记录错误日志并返回;若是第一单,尝试扣减秒杀券库存,若扣减失败抛出异常。若库存扣减成功,将订单信息保存到数据库,若保存失败也抛出异常。

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RedissonClient redissonClient;
    /**
     * 当前类初始化完毕就立马执行该方法
     */
    @PostConstruct
    private void init() {
        // 创建消息队列
        DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
        mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
        mqScript.setResultType(Long.class);
        Long result = null;
        try {
            result = stringRedisTemplate.execute(mqScript,
                    Collections.emptyList(),
                    QUEUE_NAME,
                    GROUP_NAME);
        } catch (Exception e) {
            log.error("队列创建失败", e);
            return;
        }
        int r = result.intValue();
        String info = r == 1 ? "队列创建成功" : "队列已存在";
        log.debug(info);
        // 执行线程任务
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
    /**
     * 线程池
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    /**
     * 队列名
     */
    private static final String QUEUE_NAME = "stream.orders";
    /**
     * 组名
     */
    private static final String GROUP_NAME = "g1";
    /**
     * 线程任务: 不断从消息队列中获取订单
     */
    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    // 1、从消息队列中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order >
                    List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
                            Consumer.from(GROUP_NAME, "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
                            StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
                    );
                    // 2、判断消息获取是否成功
                    if (messageList == null || messageList.isEmpty()) {
                        // 2.1 消息获取失败,说明没有消息,进入下一次循环获取消息
                        continue;
                    }
                    // 3、消息获取成功,可以下单
                    // 将消息转成VoucherOrder对象
                    MapRecord<String, Object, Object> record = messageList.get(0);
                    Map<Object, Object> messageMap = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
                    handleVoucherOrder(voucherOrder);
                    // 4、ACK确认 SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    // 处理异常消息
                    handlePendingList();
                }
            }
        }
    }
    private void handlePendingList() {
        while (true) {
            try {
                // 1、从pendingList中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0
                List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
                        Consumer.from(GROUP_NAME, "c1"),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
                        StreamOffset.create(QUEUE_NAME, ReadOffset.from("0"))
                );
                // 2、判断pendingList中是否有效性
                if (messageList == null || messageList.isEmpty()) {
                    // 2.1 pendingList中没有消息,直接结束循环
                    break;
                }
                // 3、pendingList中有消息
                // 将消息转成VoucherOrder对象
                MapRecord<String, Object, Object> record = messageList.get(0);
                Map<Object, Object> messageMap = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
                handleVoucherOrder(voucherOrder);
                // 4、ACK确认 SACK stream.orders g1 id
                stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
            } catch (Exception e) {
                log.error("处理订单异常", e);
                // 这里不用调自己,直接就进入下一次循环,再从pendingList中取,这里只需要休眠一下,防止获取消息太频繁
                try {
                    Thread.sleep(20);
                } catch (InterruptedException ex) {
                    log.error("线程休眠异常", ex);
                }
            }
        }
    }
    /**
     * 创建订单
     *
     * @param voucherOrder
     */
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
        boolean isLock = lock.tryLock();
        if (!isLock) {
            // 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)
            log.error("一人只能下一单");
            return;
        }
        try {
            // 创建订单(使用代理对象调用,是为了确保事务生效)
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            lock.unlock();
        }
    }
    /**
     * 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的Lua脚本
     */
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    /**
     * VoucherOrderServiceImpl类的代理对象
     * 将代理对象的作用域进行提升,方面子线程取用
     */
    private IVoucherOrderService proxy;
    /**
     * 抢购秒杀券
     *
     * @param voucherId
     * @return
     */
    @Transactional
    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = ThreadLocalUtls.getUser().getId();
        long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
        // 1、执行Lua脚本,判断用户是否具有秒杀资格
        Long result = null;
        try {
            result = stringRedisTemplate.execute(
                    SECKILL_SCRIPT,
                    Collections.emptyList(),
                    voucherId.toString(),
                    userId.toString(),
                    String.valueOf(orderId)
            );
        } catch (Exception e) {
            log.error("Lua脚本执行失败");
            throw new RuntimeException(e);
        }
        if (result != null && !result.equals(0L)) {
            // result为1表示库存不足,result为2表示用户已下单
            int r = result.intValue();
            return Result.fail(r == 2 ? "不能重复下单" : "库存不足");
        }
        // 2、result为0,下单成功,直接返回ok
        // 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        this.proxy = proxy;
        return Result.ok();
    }
    /**
     * 创建订单
     *
     * @param voucherOrder
     * @return
     */
    @Transactional
    @Override
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();
        // 1、判断当前用户是否是第一单
        int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
                .eq(VoucherOrder::getUserId, userId));
        if (count >= 1) {
            // 当前用户不是第一单
            log.error("当前用户不是第一单");
            return;
        }
        // 2、用户是第一单,可以下单,秒杀券库存数量减一
        boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
                .eq(SeckillVoucher::getVoucherId, voucherId)
                .gt(SeckillVoucher::getStock, 0)
                .setSql("stock = stock -1"));
        if (!flag) {
            throw new RuntimeException("秒杀券扣减失败");
        }
        // 3、将订单保存到数据库
        flag = this.save(voucherOrder);
        if (!flag) {
            throw new RuntimeException("创建秒杀券订单失败");
        }
    }
}

3 秒杀流程剖析

3.1 初始化操作

Lua 脚本准备:编写 Lua 脚本,接收优惠券 ID 和用户 ID 作为参数,判断库存是否充足以及用户是否已下单。若库存不足返回 1,用户已下单返回 2,下单成功返回 0。

-- 优惠券id
local voucherId = ARGV[1];
-- 用户id
local userId = ARGV[2];
local stockKey = 'seckill:stock:' .. voucherId;
local orderKey = 'seckill:order:' .. voucherId;
local stock = redis.call('GET', stockKey);
if (tonumber(stock) <= 0) then
    return 1;
end
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
    return 2;
end
redis.call('INCRBY', stockKey, -1);
redis.call('SADD', orderKey, userId);
return 0;

消息队列创建:在 Java 代码的 @PostConstruct 方法中,通过执行 Lua 脚本创建 Redis 的 Stream 消息队列和消费者组。

@PostConstruct
private void init() {
    DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
    mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
    mqScript.setResultType(Long.class);
    Long result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME);
    if (result == 1) {
        log.debug("队列创建成功");
    } else {
        log.debug("队列已存在");
    }
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

3.2 秒杀请求处理

资格判断:用户发起秒杀请求,系统执行 Lua 脚本,根据返回结果判断用户是否具有秒杀资格。若返回 1 表示库存不足,返回 2 表示用户已下单,均返回失败信息;返回 0 则表示具有秒杀资格。

@Override
public Result seckillVoucher(Long voucherId) {
    Long userId = ThreadLocalUtls.getUser().getId();
    long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), 
                                            voucherId.toString(), userId.toString(), String.valueOf(orderId));
    if (result != 0) {
        return Result.fail(result == 2 ? "不能重复下单" : "库存不足");
    }
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    this.proxy = proxy;
    return Result.ok();
}

订单入队:具有秒杀资格后,生成订单 ID,创建订单对象,将订单信息发送到 Redis 的 Stream 消息队列。

3.3 消息队列消费

订单处理线程:使用线程池启动一个线程任务 VoucherOrderHandler,不断从消息队列中获取订单信息。

private class VoucherOrderHandler implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
                    Consumer.from(GROUP_NAME, "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
                    StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
                );
                if (messageList == null || messageList.isEmpty()) {
                    continue;
                }
                MapRecord<String, Object, Object> record = messageList.get(0);
                Map<Object, Object> messageMap = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
                handleVoucherOrder(voucherOrder);
                stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
            } catch (Exception e) {
                log.error("处理订单异常", e);
                handlePendingList();
            }
        }
    }
}

异常处理:若处理订单过程中出现异常,调用 handlePendingList 方法从 pendingList 中获取未处理的订单信息,继续处理。

 3.4 订单创建

分布式锁保障:使用 Redisson 分布式锁,确保同一用户同一时间只能创建一个订单,避免一人多单问题。

private void handleVoucherOrder(VoucherOrder voucherOrder) {
    Long userId = voucherOrder.getUserId();
    RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
    boolean isLock = lock.tryLock();
    if (!isLock) {
        log.error("一人只能下一单");
        return;
    }
    try {
        proxy.createVoucherOrder(voucherOrder);
    } finally {
        lock.unlock();
    }
}

数据库操作:判断用户是否是第一单,若是则扣减库存并将订单保存到数据库。

@Override
public void createVoucherOrder(VoucherOrder voucherOrder) {
    Long userId = voucherOrder.getUserId();
    Long voucherId = voucherOrder.getVoucherId();
    int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId));
    if (count >= 1) {
        log.error("当前用户不是第一单");
        return;
    }
    boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
        .eq(SeckillVoucher::getVoucherId, voucherId)
        .gt(SeckillVoucher::getStock, 0)
        .setSql("stock = stock -1"));
    if (!flag) {
        throw new RuntimeException("秒杀券扣减失败");
    }
    flag = this.save(voucherOrder);
    if (!flag) {
        throw new RuntimeException("创建秒杀券订单失败");
    }
}

4 秒杀流程(文字版)

1. 初始化准备

在系统启动阶段,我们会完成一些必要的初始化工作。一方面,编写好用于判断库存和订单情况的 Lua 脚本。这个脚本会接收优惠券 ID 和用户 ID 作为参数,通过 Redis 的相关命令判断库存是否充足以及用户是否已下单,保证这些判断操作的原子性。另一方面,在 Java 代码里利用 @PostConstruct 注解,通过执行另一个 Lua 脚本来创建 Redis 的 Stream 消息队列和消费者组,为后续处理订单消息做好准备。

2. 用户请求与资格判断

当用户发起秒杀请求后,系统会立即执行之前准备好的 Lua 脚本来判断用户是否具有秒杀资格。

3. 消息队列消费

有一个专门的消息队列消费者线程会持续监听 Redis 的 Stream 消息队列。

4. 订单创建与处理

获取到锁之后,系统会进一步处理订单。

到此这篇关于Redis消息队列实现异步秒杀的文章就介绍到这了,更多相关Redis异步秒杀内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文