Redisson延迟队列实现订单关闭的操作方法
作者:皮卡丘学了没
Redisson的RDelayedQueue是实现订单到期关闭的利器,它比定时任务更精准、比Redis过期监听更可靠。其核心原理是利用Redis的**有序集合(Sorted Set)和发布/订阅(Pub/Sub)**功能,在分布式环境下可靠地执行延迟任务。
接下来,我们结合Java代码,一步步来看如何实现。
🚀 生产端:订单创建时,埋下“定时炸弹”
在用户下单成功后,我们需要做的不是立即启动一个计时器,而是将这笔订单的ID作为一条“延迟消息”发送出去。
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Service
public class OrderService {
@Resource
private RedissonClient redissonClient;
// 队列的名称,可以理解为用于存放待关闭订单的“信箱”
private static final String ORDER_QUEUE_KEY = "order-close-queue";
public void createOrder(Order order) {
// 1. 保存订单到数据库 (省略具体逻辑)
saveToDB(order);
log.info("订单 [{}] 创建成功,等待支付...", order.getId());
// 2. 将订单ID放入延迟队列,设置30分钟后“爆炸”
try {
// 获取一个阻塞队列,这是消费者最终要监听的目标队列
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(ORDER_QUEUE_KEY);
// 基于阻塞队列,创建一个延迟队列
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
// 将订单ID放入延迟队列,延迟30分钟
delayedQueue.offer(order.getId().toString(), 30, TimeUnit.MINUTES);
log.info("订单 [{}] 已放入延迟队列,30分钟后将自动关闭", order.getId());
} catch (Exception e) {
log.error("放入延迟队列失败", e);
// 这里可以考虑补偿机制,比如记录日志后由定时任务兜底
}
}
}核心逻辑:生产者调用delayedQueue.offer(),将任务(订单ID)和延迟时间(30分钟)告诉Redisson。Redisson客户端会把这个任务连同计算好的执行时间戳,一起存到Redis的一个**有序集合(ZSet)**中,并以时间戳作为排序的分数(score)。
🎧 消费端:时刻待命,准时“拆弹”
我们需要一个后台任务一直监听,一旦有订单到期,立刻执行关闭操作。
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class OrderCloseListener implements CommandLineRunner {
@Resource
private RedissonClient redissonClient;
@Resource
private OrderService orderService; // 注入你的订单Service
private static final String ORDER_QUEUE_KEY = "order-close-queue";
@Override
public void run(String... args) throws Exception {
// 程序启动后,在一个独立的线程中监听队列
new Thread(() -> {
log.info("订单关闭监听线程已启动,等待到期订单...");
// 获取与生产者相同的阻塞队列
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(ORDER_QUEUE_KEY);
// 注意:这里必须调用一次getDelayedQueue,目的是在消费端也初始化相关的监听器
// 虽然不调用也能工作,但官方推荐调用以保证可靠性
redissonClient.getDelayedQueue(blockingQueue);
while (true) {
try {
// 阻塞等待,直到有订单到期。take()方法会一直阻塞直到拿到数据
String orderId = blockingQueue.take();
log.info("收到到期订单ID:{},开始执行关闭操作", orderId);
// 执行真正的关单业务逻辑
orderService.closeExpiredOrder(Long.parseLong(orderId));
} catch (InterruptedException e) {
log.error("监听线程被中断", e);
Thread.currentThread().interrupt();
break; // 线程中断时退出循环
} catch (Exception e) {
log.error("处理到期订单时发生错误", e);
// 防止单个消息处理失败导致循环中断,继续监听下一个
}
}
}, "OrderClose-Listener").start();
}
}核心逻辑:消费者通过blockingQueue.take()阻塞地从Redis的**目标队列(List)**中获取消息。当一个任务在ZSet中的时间戳小于当前时间,Redisson的后台线程就会自动把它从ZSet移动到目标List中。这时,take()方法就会立即返回,拿到订单ID,执行关闭逻辑。
🔍 探秘Redisson的“内部时钟”机制
你可能会好奇,Redisson是如何精准地将到期任务从ZSet移到List的?这背后有一个巧妙的设计:
Redisson为每个延迟队列启动了一个后台轮询线程(基于Netty的时间轮实现)。这个线程会:
- 定期查询ZSet中分数最小(即最早到期)的任务。
- 如果该任务的到期时间戳小于当前时间,就把这个任务以及所有其他到期的任务,从ZSet和另一个辅助的List中移除,并推入到消费者正在监听的目标List中。
- 为了提高效率,当有新的、更早到期的任务加入时,Redisson会通过**发布/订阅(Pub/Sub)**功能发送一个通知,唤醒轮询线程立即工作,而不是等到下一个轮询周期。
所以,整个过程就像有一个精准的“闹钟”在帮你管理这些任务。
💡 实战要点与进阶思考
- 消息可靠性:所有任务数据都存储在Redis中,即使你的应用服务重启,已经提交但未到期的任务也不会丢失。Redis的持久化(RDB/AOF)机制为数据提供了最终保障。
- 分布式支持:生产者和消费者可以是完全不同的应用实例,只要它们连接同一个Redis、并使用相同的队列名称(如
order-close-queue)即可。多个消费者实例可以同时take同一个队列,实现任务的负载均衡。 - 时间精度:Redisson的默认轮询间隔约为5秒,这意味着任务的触发时间可能会有最多5秒的误差。如果业务对时间精度要求极高,可以通过配置
config.setScanInterval(2000)来缩短轮询间隔(单位为毫秒),但会略微增加Redis的压力。 - 业务兜底:作为一种最佳实践,即使有了延迟队列,也建议配合一个低频的定时任务(如每小时执行一次)作为最后的检查,去扫描那些极少数可能因为各种意外(如Redis故障)而未被关闭的订单,确保业务逻辑的最终一致性。
到此这篇关于Redisson延迟队列实现订单关闭的操作方法的文章就介绍到这了,更多相关redisson延迟队列订单关闭内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
