Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis延时任务定时任务

Redis 延时任务实现及与定时任务区别详解

作者:Dream_sky

这篇文章主要为大家介绍了Redis 延时任务实现及与定时任务区别详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

1. 生成订单30分钟未支付,则自动取消

2. 30分钟未回复,则结束会话

对上述的任务,我们给一个专业的名字来形容,那就是延时任务

一、延时任务是什么

延时任务不同于一般的定时任务,延时任务是在某事件触发后的未来某个时刻执行,没有重复的执行周期。

二、延时任务和定时任务的区别是什么

定时任务一般执行的是批处理多个任务,而延时任务一般是单任务处理

三、技术对比

本文主要讲解Redis的Zset实现延时任务,其他方案只做介绍

1.数据库轮询

通过定时组件的去扫描数据库,通过时间来判断是否有超时的订单,然后进行update或delete等操作

优点:

简单易行

缺点:

2.JDK的延迟队列

利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中,是必须实现Delayed接口的。

优点:实现简单,效率高,任务触发时间延迟低。

缺点:

3.时间轮算法

时间轮TimingWheel是一种高效、低延迟的调度数据结构,底层采用数组实现存储任务列表的环形队列,示意图如下:时间轮

时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。

如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)

优点:效率高,任务触发时间延迟时间比delayQueue低

缺点:

4.使用消息队列

使用RabbitMQ死信队列依赖于RabbitMQ的两个特性:TTL和DLX。

TTL:Time To Live,消息存活时间,包括两个维度:队列消息存活时间和消息本身的存活时间。

DLX:Dead Letter Exchange,死信交换器。

优点:异步交互可以削峰,高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。

缺点:

1.本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高

2.RabbitMq是一个消息中间件;延迟队列只是其中一个小功能,如果团队技术栈中本来就是使用RabbitMq那还好,如果不是,那为了使用延迟队列而去部署一套RabbitMq成本有点大;

5.Redis的Zset实现延时任务

为什么采用Redis的ZSet实现延迟任务?

zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序

5.1 思路分析

5.2 Redis中Zset的简单介绍及使用

Redis 有序集合是 string 类型元素的集合,且不允许重复的成员。每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。有序集合的成员是唯一的,但分数(score)却可以重复。

常用命令

java中操作简单介绍

    1.add(K key, V value, double score)
    添加元素到变量中同时指定元素的分值。
    redisTemplate.opsForZSet().add("zSetValue","A",1);
    2.rangeByScore(K key, double min, double max)
    根据设置的score获取区间值。
    zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,2);
    3.rangeByScore(K key, double min, double max,long offset, long count)
    根据设置的score获取区间值从给定下标和给定长度获取最终值。
    zSetValue = redisTemplate.opsForZSet().rangeByScore("zSetValue",1,5,1,3);
    4.rangeWithScores(K key, long start, long end)
    获取RedisZSetCommands.Tuples的区间值。
    Set<ZSetOperations.TypedTuple<Object>> typedTupleSet = redisTemplate.opsForZSet().rangeWithScores("typedTupleSet",1,3);
    Iterator<ZSetOperations.TypedTuple<Object>> iterator = typedTupleSet.iterator();
    while (iterator.hasNext()){
        ZSetOperations.TypedTuple<Object> typedTuple = iterator.next();
        Object value = typedTuple.getValue();
        double score = typedTuple.getScore();
      }
    5.删除成员
    redisTemplate.opsForZSet().remove("myZset","a","b");

以下代码可以直接使用-基于Spring Boot项目

5.3 延时队列工厂

代码中注释有详细介绍

/**
 * 延时队列工厂
 *
 **/
@Slf4j
public abstract class AbstractDelayQueueMachineFactory {
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private ThreadPoolTaskExecutor asyncTaskExecutor;
    /**
     * 插入任务id
     *
     * @param jobId 任务id(队列内唯一)
     * @param time  延时时间(单位 :毫秒)
     * @return 是否插入成功
     */
    public boolean addJob(String jobId, Integer time) {
        Calendar instance = Calendar.getInstance();
        //增加延时时间,获取最终触发时间
        instance.add(Calendar.MILLISECOND, time); 
        long delayMillisecond = instance.getTimeInMillis();
        log.info("延时队列添加问题{}",jobId);
        return redisUtil.zAdd(setDelayQueueName(), delayMillisecond, jobId);
    }
    /**
     * 删除任务id
     *
     * @param jobId 任务id(队列内唯一)
     */
    public boolean removeJob(String jobId) {
        Long num = redisUtil.zRemove(setDelayQueueName(), jobId);
        if (num > 0) return true;
        return false;
    }
    /**
     * 延时队列机器开始运作
     */
    private void startDelayQueueMachine() {
        log.info("延时队列{}开始启动", setDelayQueueName());
        // 监听redis队列
        while (true) {
            try {
                // 获取当前时间前的任务列表
                Set<ZSetOperations.TypedTuple<Object>> tuples = redisUtil.zRangeByScore(setDelayQueueName(), 0, System.currentTimeMillis() );
                // 如果任务不为空
                if (!CollectionUtils.isEmpty(tuples)) {
                    log.info("延时任务开始执行:{}", JSONUtil.toJsonStr(tuples));
                    Iterator<ZSetOperations.TypedTuple<Object>> iterator = tuples.iterator();
                    while (iterator.hasNext()){
                        ZSetOperations.TypedTuple<Object> typedTuple = iterator.next();
                        String questionId = Convert.toStr(typedTuple.getValue());
                        // 移除缓存,如果移除成功则表示当前线程处理了延时任务,则执行延时任务
                        // 删除成功才执行延时任务,否则不执行,这样可以避免分布式系统延时任务多次执行
                        Long num = redisUtil.zRemove(setDelayQueueName(), questionId);
                        // 如果移除成功, 则执行
                        if (num > 0) {
                            asyncTaskExecutor.execute(() -> invoke(questionId));
                        }
                    }
                }
            } catch (Exception e) {
                log.error("处理延时任务发生异常,异常原因为{}", e.getMessage(), e);
            } finally {
                // 间隔()分钟执行一次
                //根据业务场景设置对应时间
                try {
                    TimeUnit.MINUTES.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 最终执行的任务方法
     *
     * @param jobId 任务id
     */
    public abstract void invoke(String jobId);
    /**
     * 要实现延时队列的名字
     */
    public abstract String setDelayQueueName();
    //Spring Boot初始化时开启一条线程运行
    @PostConstruct
    public void init() {
        new Thread(this::startDelayQueueMachine).start();
    }
}

 addJob方法是添加任务id和延时时间(单位毫秒)

redisUtil.zRangeByScore ::根据设置的score获取区间值

@PostConstruct注解:是针对Bean的初始化完成之后做一些事情,比如注册一些监听器..(初始化实现方案有很多可自行选择)

为什么先删除后执行业务逻辑?

删除成功才执行延时任务,否则不执行,这样可以避免分布式系统延时任务多次执行

5.4 RedisUtil工具类

@Component
@Slf4j
public class RedisUtil {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    /**
     * 向Zset里添加成员
     *
     * @param key   key值
     * @param score 分数,通常用于排序
     * @param value 值
     * @return 增加状态
     */
    public boolean zAdd(String key, long score, String value) {
        Boolean result = redisTemplate.opsForZSet().add(key, value, score);
        return result;
    }
    /**
     * 获取 某key 下 某一分值区间的队列
     *
     * @param key  缓存key
     * @param from 开始时间
     * @param to   结束时间
     * @return 数据
     */
    public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) {
        Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to);
        return set;
    }
    /**
     * 移除 Zset队列值
     *
     * @param key   key值
     * @param value 删除的集合
     * @return 删除数量
     */
    public Long zRemove(String key, String... value) {
        return redisTemplate.opsForZSet().remove(key, value);
    }
}

5.5 测试延时队列

继承上文中的延时队列工厂重写invoke(处理业务)和setDelayQueueName--延时队列名称也就是Zset中的key值

/**
 * 测试延时队列
 *
 */
@Slf4j
@Component
public class DelayQueue extends AbstractDelayQueueMachineFactory {
    @Autowired
    private ZnjExpertConsultQuestionRecordMapper questionRecordMapper;
    /** 
     * 处理业务逻辑
     */ 
    @Override
    public void invoke(String jobId) {
        Integer questionId = Convert.toInt(jobId);
        ZnjExpertConsultQuestionRecordEntity questionRecordEntity = questionRecordMapper.selectById(questionId);
        Boolean flag = znjExpertConsultService.whetherEnd(questionRecordEntity);
   /** 
    * 延时队列名统一设定
    */ 
    @Override
    public String setDelayQueueName() {
        return "expert_consult:delay_queue";
    }
}

运行成功,当Redis中有任务时,则执行任务即可

四、总结

使用redis zset来实现延时任务,总体类说是可行的

以上就是Redis 延时任务实现及与定时任务区别详解的详细内容,更多关于Redis延时任务定时任务的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文