SpringBoot中Redisson延迟队列的示例
作者:Frank-fu
延时队列是一种常见的需求,延时队列允许我们延迟处理某些任务,本文主要介绍了Redisson延迟队列的示例,具有一定的参考价值,感兴趣的可以了解一下
场景:
需求:
支付的二维码,超过两个小时以后,如果还未支付,则自动转为取消支付,或者支付超时的状态
需求分析:
1,动态定时任务:
每个支付的二维码创建的时候,创建一个动态的定时任务,两个小时候自动执行,更新支付状态,可以解决这个问题。
(1)持久化:
如果服务重启了,动态定时任务会丢失,导致部分数据没办法更新状态。
(2)分布式:
如果当服务重启时,自动扫描数据,重新计算时间,再次创建动态定时任务。可以解决(1)的问题,但是当分布式,多个节点的时候,都会重新加载所有的任务,这样性能上不是最优解,只能在数据源上加上节点名称,不同的服务节点,加载属于自己的定时任务,可以解决这个问题。总的想想,太麻烦了,还是算了。
2,Redisson延迟队列
(1)持久化:队列信息放在Redis上,服务重启不影响。
(2)分布式:多节点去Redis拿去数据,谁抢到算谁的,不会存在同一个任务,多个节点支持。唯一不足就是过度依赖Redis,万一Redis崩了,那就凉凉了(那就是要把Redis配置高可用,当前业务就不用管了)。总体来说还是比较好用的。
实现
1,创建延迟队列的监听任务【RedisDelayedQueueListener】,消费延迟队列
2,创建新增延迟队列的类,用于创建延迟队列
3,整体初始化,把监听任务与spring绑定,扫描各个监听延迟队列的实现类,并开启单独线程,监听任务。
4,创建延迟任务(开始测试使用)
连接Redis
不贴代码了,自己在网上搜
监听延迟队列
接口:
/** * 队列事件监听接口,需要实现这个方法 * * @module * @author frank * @date 2021/8/19 10:50 */ public interface RedisDelayedQueueListener<T> { /** * 执行方法 * * @param t */ void invoke(T t); }
实现:
import com.sxmaps.netschool.common.redisson.RedisDelayedQueueListener; import com.sxmaps.netschool.service.vo.school.SchoolAccountPayStateReqVO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 支付二维码监听器 * * @module * @author frank * @date 2021/8/19 10:49 */ @Component public class PayQCordListener implements RedisDelayedQueueListener<SchoolAccountPayStateReqVO> { private final Logger logger = LoggerFactory.getLogger(PayQCordListener.class); @Autowired private SchoolAccountService schoolAccountService; @Override public void invoke(SchoolAccountPayStateReqVO payStateReqVO) { logger.info("支付二维码-延迟失效,内容:{}", payStateReqVO); //处理业务,更新二维码状态 logger.info("支付二维码-延迟失效,内容:{},处理结果:{}", payStateReqVO,respDTO); } }
增加延迟队列
import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * 增加延迟信息 * * @author frank * @module * @date 2021/8/19 10:49 */ @Component public class RedisDelayedQueue { private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class); @Autowired RedissonClient redissonClient; /** * 添加队列 * * @param t DTO传输类 * @param delay 时间数量 * @param timeUnit 时间单位 * @param <T> 泛型 */ private <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) { logger.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,t); RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(t, delay, timeUnit); } /** * 添加队列-秒 * * @param t DTO传输类 * @param delay 时间数量 * @param <T> 泛型 */ public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) { addQueue(t, delay, TimeUnit.SECONDS, clazz.getName()); } /** * 添加队列-分 * * @param t DTO传输类 * @param delay 时间数量 * @param <T> 泛型 */ public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) { addQueue(t, delay, TimeUnit.MINUTES, clazz.getName()); } /** * 添加队列-时 * * @param t DTO传输类 * @param delay 时间数量 * @param <T> 泛型 */ public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) { addQueue(t, delay, TimeUnit.HOURS, clazz.getName()); } /** * 添加队列-天 * * @param t DTO传输类 * @param delay 时间数量 * @param <T> 泛型 */ public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) { addQueue(t, delay, TimeUnit.DAYS, clazz.getName()); } }
整体初始化
import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.util.Map; /** * 初始化队列监听 * * @module * @author frank * @date 2021/8/19 10:49 */ @Component public class RedisDelayedQueueInit implements ApplicationContextAware { private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class); @Autowired RedissonClient redissonClient; /** * 获取应用上下文并获取相应的接口实现类 * * @param applicationContext * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class); for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) { String listenerName = taskEventListenerEntry.getValue().getClass().getName(); startThread(listenerName, taskEventListenerEntry.getValue()); } } /** * 启动线程获取队列* * * @param queueName queueName * @param redisDelayedQueueListener 任务回调监听 * @param <T> 泛型 * @return */ private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); //服务重启后,无offer,take不到信息。 redissonClient.getDelayedQueue(blockingFairQueue); //由于此线程需要常驻,可以新建线程,不用交给线程池管理 Thread thread = new Thread(() -> { logger.info("启动监听队列线程" + queueName); while (true) { try { T t = blockingFairQueue.take(); logger.info("监听队列线程,监听名称:{},内容:{}", queueName, t); redisDelayedQueueListener.invoke(t); } catch (Exception e) { logger.info("监听队列线程错误,", e); } } }); thread.setName(queueName); thread.start(); } }
创建延迟任务
@Autowired RedisDelayedQueue queue; ................. queue.addQueueHours(new SchoolAccountPayStateReqVO(dto.getPayNo()),2, PayQCordListener.class);
到此这篇关于Redisson延迟队列的示例的文章就介绍到这了,更多相关Redisson延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!