java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring Boot  Redisson延迟队列

Spring Boot 项目集成 Redisson 实现延迟队列的详细过程

作者:徐州蔡徐坤

本文介绍延迟队列在订单超时等场景的应用及四种技术方案对比,推荐Redisson延迟队列,提供项目结构与测试源码,对Spring Boot  Redisson延迟队列相关知识感兴趣的朋友一起看看吧

延迟队列应用场景

  1. 订单支付超时:用户下单后30分钟未支付,自动取消订单。
  2. 订单评价超时:订单签收后7天未评价,系统默认好评。
  3. 商家接单超时:下单成功后商家5分钟未接单,订单取消。
  4. 配送超时提醒:配送超时,推送短信提醒。

技术选型分析

针对延迟任务处理机制,主要可选方案有以下四种:定时轮询、Redisson 延迟队列、消息中间件、Redis 过期监听。

1. 定时任务轮询

机制: 通过定时任务如 @Scheduled 或 Quartz),以固定频率轮询数据库或 Redis,查找已到期的任务并处理。

优点:由于springboot原生支持,实现成本低,并且可以任务统一管理

缺点

适用场景:小型系统、任务量小、业务容忍较大延迟的场景。

2. Redisson 延迟队列(推荐)

机制说明
基于 Redis ZSet 和 List 封装的延迟队列,由 Redisson 实现,支持回调消费。

优点

缺点:实现依赖 Redisson,同时需要保证Redis 崩溃等情况设计补偿保护机制

适用场景:中大型系统、微服务架构下的延迟任务处理。

3. 消息中间件延迟队列( RabbitMQ、Kafka等)

机制:通过消息中间件的 TTL(消息生存时间)和死信队列机制实现延迟任务,例如 RabbitMQ 的 DLX(死信交换机)或 Kafka 的延迟消费。

优点

缺点

适用场景:高并发、高可用要求的核心业务,如订单超时关闭、促销活动控制等。

4. Redis Key 过期监听(不推荐)

最初是通过redis的思路来实现延迟队列功能,但是通过查询资料和官方文档发现,redis并不适合此种场景

机制
通过启用 Redis 的 Keyspace Notification 功能,监听键过期事件(需设置 notify-keyspace-events 配置项)。

官方文档说明

Redis 中 Key 的过期事件 expired 有两种触发方式:

  • 在访问 Key 时发现其已过期
  • 后台线程定期扫描并删除过期 Key

因此,并不能保证在 TTL 恰好归零时立即触发过期事件,也不保证事件一定会触发。

缺点

适用场景:临时性、非强一致性场景,如验证码、状态标记清理等。

参考

选型建议总结

方案实现难度实时性可靠性分布式支持推荐场景
定时任务轮询有限简单、低频业务
Redisson 延迟队列分布式、业务量中等、场景标准
MQ 延迟队列极好高并发、大量异步、核心任务场景
Redis 过期监听不确定非核心场景,缓存状态变更类任务

Redisson 延迟队列实现

项目结构

├── config
│   └── RedissonConfig.java                # 配置 Redisson 客户端,创建 RedissonClient Bean
├── controller
│   └── DeliveryController.java            # 提供REST 接口模拟订单创建和收货操作,触发延迟任务
├── enums
│   └── DelayQueueEnum.java                # 定义延迟队列的业务枚举及其关联的处理类
├── hander
│   ├── DelayQueueHandler.java             # 延迟队列处理器接口,定义 execute 方法
│   ├── EvaluationTimeoutHandler.java      # 处理评价超时逻辑的具体实现类
│   └── OrderPaymentTimeoutHandler.java    # 处理订单支付超时逻辑的具体实现类
├── runner
│   └── RedisDelayQueueRunner.java         # 启动后监听并执行延迟队列任务,使用线程池并发处理
├── utils
│   ├── RedisDelayQueueUtil.java           # 封装 Redis 延迟队列的操作方法(添加/获取元素)
│   └── SpringUtils.java                   # 工具类,用于在非 Spring 管理类中获取 Bean
└── SpringbootApplication.java             # Spring Boot 主类,包含程序入口 main 方法

Redis延迟队列工具类

package com.zhou.demo.utils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
 * redis延迟队列工具
 */
@Slf4j
@Component
public class RedisDelayQueueUtil {
    @Resource
    private RedissonClient redissonClient;
    /**
     * 将元素添加到延迟队列中
     *
     * @param queueCode 队列键(用于标识不同的队列)
     * @param value     要添加到队列中的值(泛型类型)
     * @param delay     延迟时间(指定元素在队列中延迟被消费的时间)
     * @param timeUnit  时间单位(与延迟时间配合使用,如秒、毫秒等)
     */
    public <T> void addDelayQueue(String queueCode, T value, long delay, TimeUnit timeUnit) {
        try {
            RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
            RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            delayedQueue.offer(value, delay, timeUnit);
            log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}秒", queueCode, value, timeUnit.toSeconds(delay));
        } catch (Exception e) {
            log.error("(添加延时队列失败) {}", e.getMessage(), e);
            throw new RuntimeException("(添加延时队列失败)", e);
        }
    }
    /**
     * 获取延迟队列中的元素
     *
     * @param queueCode 队列键
     * @param <T>       元素类型
     * @return 队列中的元素
     */
    public <T> T getDelayQueue(String queueCode) throws InterruptedException {
        RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        return blockingDeque.take();
    }
}

Redis延迟队列运行器

用于在Spring Boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。

package com.zhou.demo.runner;
import com.zhou.demo.enums.DelayQueueEnum;
import com.zhou.demo.hander.DelayQueueHandler;
import com.zhou.demo.utils.RedisDelayQueueUtil;
import com.zhou.demo.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * Redis延迟队列运行器
 * 用于在Spring Boot启动后监听各个延迟队列,并在线程池中执行对应的业务逻辑。
 *
 * @author zhouquan
 */
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {
    @Resource
    private RedisDelayQueueUtil redisDelayQueueUtil;
    /**
     * 线程池,用于并发监听不同的延迟队列
     */
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    /**
     * 运行状态标志,控制线程是否持续监听队列
     */
    private volatile boolean running = true;
    /**
     * Spring Boot 启动完成后自动运行的方法
     * 遍历所有延迟队列枚举,为每个队列创建一个监听线程
     *
     * @param args 命令行参数
     */
    @Override
    public void run(String... args) {
        for (DelayQueueEnum queueEnum : DelayQueueEnum.values()) {
            executorService.execute(() -> {
                log.info("启动延迟队列监听线程:{}", queueEnum.getCode());
                while (running) {
                    try {
                        Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
                        DelayQueueHandler handler = SpringUtils.getBean(queueEnum.getBeanClass());
                        handler.execute(value);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        log.warn("线程中断:{}", queueEnum.getCode());
                    } catch (Exception ex) {
                        log.error("延迟队列 [{}] 处理异常:{}", queueEnum.getCode(), ex.getMessage(), ex);
                    }
                }
            });
        }
        log.info("所有 Redis 延迟队列监听启动完成");
    }
    /**
     * 在 Bean 销毁前关闭线程池,释放资源
     */
    @PreDestroy
    public void shutdown() {
        log.info("准备关闭 Redis 延迟队列监听线程池");
        running = false;
        executorService.shutdownNow();
    }
}

业务枚举类

package com.zhou.demo.enums;
import com.zhou.demo.hander.EvaluationTimeoutHandler;
import com.zhou.demo.hander.OrderPaymentTimeoutHandler;
import com.zhou.demo.hander.DelayQueueHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
 * 延迟队列业务枚举
 *
 * @author 18324
 */
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum DelayQueueEnum {
    /**
     * 订单超时
     */
    ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "订单支付超时", OrderPaymentTimeoutHandler.class),
    /**
     * 评价超时
     */
    EVALUATION_TIMEOUT("evaluation_timeout", "评价超时", EvaluationTimeoutHandler.class);
    /**
     * 延迟队列 Redis Key
     */
    private String code;
    /**
     * 中文描述
     */
    private String name;
    /**
     * 延迟队列具体业务实现的 Bean
     * 可通过 Spring 的上下文获取
     */
    private Class<? extends DelayQueueHandler<Long>> beanClass;
}

测试接口类

package com.zhou.demo.enums;
import com.zhou.demo.hander.EvaluationTimeoutHandler;
import com.zhou.demo.hander.OrderPaymentTimeoutHandler;
import com.zhou.demo.hander.DelayQueueHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
 * 延迟队列业务枚举
 *
 * @author 18324
 */
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum DelayQueueEnum {
    /**
     * 订单超时
     */
    ORDER_PAYMENT_TIMEOUT("order_payment_timeout", "订单支付超时", OrderPaymentTimeoutHandler.class),
    /**
     * 评价超时
     */
    EVALUATION_TIMEOUT("evaluation_timeout", "评价超时", EvaluationTimeoutHandler.class);
    /**
     * 延迟队列 Redis Key
     */
    private String code;
    /**
     * 中文描述
     */
    private String name;
    /**
     * 延迟队列具体业务实现的 Bean
     * 可通过 Spring 的上下文获取
     */
    private Class<? extends DelayQueueHandler<Long>> beanClass;
}

延迟队列任务测试

源码地址

https://gitee.com/zhouquanstudy/springboot-redisson-delayqueue

参考

[1]. SpringBoot集成Redisson实现延迟队列_redisson delayedqueue

[2]. 请勿过度依赖Redis的过期监听业务

到此这篇关于Spring Boot 项目集成 Redisson 实现延迟队列的文章就介绍到这了,更多相关Spring Boot Redisson延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文