java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java Netty时间轮

Java利用Netty时间轮实现延时任务

作者:字母哥哥

时间轮是一种可以执行定时任务的数据结构和算法。本文将为大家详细讲解一下Java如何利用Netty时间轮算法实现延时任务,感兴趣的小伙伴可以了解一下

一、时间轮算法简介

为了大家能够理解下文中的代码,我们先来简单了解一下netty时间轮算法的核心原理

时间轮算法名副其实,时间轮就是一个环形的数据结构,类似于表盘,将时间轮分成多个bucket(比如:0-8)。假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是 1 s),当前的时间bucket=3,那么在18秒后需要被执行的任务需要落到((3+18)%8=5取余运算)的5号bucket上。假如有多个需要在该时间段内执行的任务,就会组成一个双向链表。另外针对时间轮我们要有下面的几个认知:

时间轮指针是一个Worker线程,在时间轮整点的时候执行双向链表中的任务。

时间轮算法的并不是精准的延时,它的执行精度取决于每个时间轮轮片的分隔时间段tickDuration

Worker线程是单线程,一个bucket、一个bucket的顺序处理任务。「所以我们的延时任务一定要做成异步任务,否则会影响时间轮后续任务的执行时间。」

二、时间轮hello-world

实现一个延时任务的例子,需求仍然十分的简单:你买了一张火车票,必须在30分钟之内付款,否则该订单被自动取消。「订单30分钟不付款自动取消,这个任务就是一个延时任务。」 我们的火车票订单取消任务,从需求上看并不需要非常精准的延时,所以是可以使用时间轮算法来完成这个任务的。

首先通过maven坐标引入netty

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.45.Final</version>
</dependency>

然后我们创建一个时间轮,如果是Spring的开发环境,我们可以这么做。下文中我们new了一个包含512个bucket的时间轮,每个时间轮的轮片时间间隔是100毫秒。

@Bean("hashedWheelTimer")
public HashedWheelTimer hashedWheelTimer(){
    return new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
}

举例:当用户买火车票下单的时候,向时间轮中添加一个30分钟的延时任务。延时任务将在30分钟之后被执行,下文的lambda表达式部分实现了一个TimerTask(task)延时任务。这个延时任务的函数体内,请一定使用异步任务,即:单独起一个线程或者使用SpringBoot异步任务线程池。因为Worker线程是单线程的,你的任务处理时间长于tickDuration会妨碍后续时间轮轮片上的任务的执行。

//订单下单操作
void order(String orderInfo) {
  //下单的时候,向时间轮中添加一个30分钟的延时任务
  hashedWheelTimer.newTimeout(task -> {
    //注意这里使用异步任务线程池或者开启线程进行订单取消任务的处理
    cancelOrder(orderInfo);
  }, 30, TimeUnit.MINUTES);
}

三、异步任务线程池

我们在上文中已经多次强调,时间轮的任务TimerTask的执行内容要做成异步的。最简单的做法就是接到一个任务之后启动一个线程处理该任务。在Spring环境下其实我们有更好的选择,就是使用Spring的线程池,这个线程池是可以自定义的。比如:下文中的用法是我事先定义了一个名字为test的线程池,然后通过@Async使用即可。

@Async("test")
public void cancelOrder(String orderInfo){
  //查询订单支付信息,如果用户未支付,关闭订单
}

可能有的朋友,还不知道该如何自定义一个Spring线程池,可以参考:我之前写过一个SpringBoot的**「可观测、易配置」**的线程池开源项目,源代码地址:https://gitee.com/hanxt/zimug-monitor-threadpool。我的这个zimug-monitor-threadpool开源项目,可以做到对线程池使用情况的监控,我自己平时用的效果还不错,向大家推荐一下!

四、时间轮优缺点

时间轮算法实现延时任务的优点就是,相对于使用JDK的DelayQueue,其算法上具有优势,执行性能相对好一些。其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失。这一缺点和DelayQueue是一样的。为了解决这个问题,我们可以使用redis、RocketMQ等分布式中间件来管理延时任务消息的方式来实现延时任务,这个我会在后续的文章中为大家介绍。

知识点补充

下面主要和大家一起来分析下Netty时间轮调度算法的原理

时间轮状态

时间轮有以下三种状态:

状态转换如下:

构造函数

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // 初始化时间轮数组,时间轮大小为大于等于 ticksPerWheel 的第一个 2 的幂,和 HashMap 类似
        wheel = createWheel(ticksPerWheel);
        // 取模用,用来定位数组中的槽
        mask = wheel.length - 1;

        // 为了保证精度,时间轮内的时间单位为纳秒
        long duration = unit.toNanos(tickDuration);

        // 时间轮内的时钟拨动频率不宜太大也不宜太小
        if (duration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }

        if (duration < MILLISECOND_NANOS) {
            logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
                        tickDuration, MILLISECOND_NANOS);
            this.tickDuration = MILLISECOND_NANOS;
        } else {
            this.tickDuration = duration;
        }

        // 创建工作线程
        workerThread = threadFactory.newThread(worker);


        // 非守护线程且 leakDetection 为 true 时检测内存是否泄漏
        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

        // 初始化最大等待任务数
        this.maxPendingTimeouts = maxPendingTimeouts;

        // 如果创建的时间轮实例大于 64,打印日志,并且这个日志只会打印一次
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

构造函数中的参数相当重要,当自定义时间轮时,我们应该根据业务的范围设置合理的参数:

时间轮的时钟拨动时长应该根据业务设置恰当的值,如果设置的过大,可能导致任务触发时间不准确。如果设置的过小,时间轮转动频繁,任务少的情况下加载不到任务,属于一直空转的状态,会占用 CPU 线程资源。

为了防止时间轮占用过多的 CPU 资源,当创建的时间轮对象大于 64 时会以日志的方式提示。

构造函数中只是初始化了轮线程,并没有开启,当第一次往时间轮内添加任务时,线程才会开启。

到此这篇关于Java利用Netty时间轮实现延时任务的文章就介绍到这了,更多相关Java Netty时间轮内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文