java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java限流任务执行器

基于Java手写一个通用限流任务执行器

作者:秋云编程

调用第三方API最怕什么,那就是怕被限流,今天分享一个自制的限流任务执行器,能帮你轻松控制请求频率,还能自动重试失败任务,指数退避不添乱,快跟随小编一起学习起来吧

调用第三方API最怕什么?怕被限流!今天分享一个自制的限流任务执行器,能帮你轻松控制请求频率,还能自动重试失败任务,指数退避不添乱。代码可直接复制到项目中使用~

背景痛点

很多场景下我们需要控制任务的执行速率:

今天要介绍的 RateLimitedExecutor 就是为解决这些问题而生的。

核心功能

限流执行 – 每秒最多执行N个任务,超出则排队等待

顺序保证 – 任务严格按照提交顺序执行

自动重试 – 失败后自动重试(可配置最大次数)

指数退避 – 支持退避延迟策略,避免加重服务端压力

异步返回 – 使用 CompletableFuture 获取结果,不阻塞主线程

类设计(完整代码)

import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.*;
import java.util.function.Function;

/**
 * 限流任务执行器:按固定速率顺序执行任务,支持重试。
 * @param <T> 任务返回值类型
 */
public class RateLimitedExecutor<T> {
    private final BlockingQueue<TaskWrapper<T>> queue = new LinkedBlockingQueue<>();
    private final RateLimiter rateLimiter;
    private final int maxRetries;
    private final long initialDelayMs;    // 首次重试延迟(毫秒)
    private final double backoffMultiplier; // 退避乘数(如2.0表示每次翻倍)
    private final ExecutorService worker = Executors.newSingleThreadExecutor(r -> {
        Thread t = new Thread(r, "RateLimitedExecutor-Worker");
        t.setDaemon(true);  // 设为守护线程,避免阻止JVM退出
        return t;
    });
    private volatile boolean running = true;

    /**
     * 构造限流执行器
     * @param permitsPerSecond 每秒允许执行的任务数(如1.0表示每秒1次)
     * @param maxRetries       最大重试次数(不含首次执行)
     * @param initialDelayMs   首次重试延迟毫秒数
     * @param backoffMultiplier 退避乘数(1.0表示固定延迟,>1.0表示指数退避)
     */
    public RateLimitedExecutor(double permitsPerSecond, int maxRetries,
                               long initialDelayMs, double backoffMultiplier) {
        this.rateLimiter = RateLimiter.create(permitsPerSecond);
        this.maxRetries = maxRetries;
        this.initialDelayMs = initialDelayMs;
        this.backoffMultiplier = backoffMultiplier;
        worker.submit(this::process);
    }

    /**
     * 提交一个任务,返回CompletableFuture异步获取结果
     * @param task 需要执行的任务(Callable)
     * @return 代表异步结果的CompletableFuture
     */
    public CompletableFuture<T> submit(Callable<T> task) {
        CompletableFuture<T> future = new CompletableFuture<>();
        queue.offer(new TaskWrapper<>(task, future));
        return future;
    }

    // 工作线程主循环
    private void process() {
        while (running) {
            try {
                TaskWrapper<T> wrapper = queue.take(); // 阻塞直到有任务
                executeWithRetry(wrapper);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    // 执行单个任务(带重试)
    private void executeWithRetry(TaskWrapper<T> wrapper) {
        int retries = 0;
        long delay = initialDelayMs;
        while (retries <= maxRetries) {
            // 限流:获取令牌,若不足则阻塞
            rateLimiter.acquire();

            try {
                T result = wrapper.task.call();
                wrapper.future.complete(result);
                return; // 成功,结束
            } catch (Exception e) {
                retries++;
                if (retries > maxRetries) {
                    wrapper.future.completeExceptionally(e);
                    return;
                }
                // 重试等待(退避)
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    wrapper.future.completeExceptionally(ie);
                    return;
                }
                // 更新下次重试延迟
                delay = (long) (delay * backoffMultiplier);
            }
        }
    }

    /**
     * 优雅关闭执行器:等待已提交任务执行完毕,不再接受新任务
     */
    public void shutdown() {
        running = false;
        worker.shutdown(); // 不再接受新任务
        try {
            if (!worker.awaitTermination(5, TimeUnit.SECONDS)) {
                worker.shutdownNow();
            }
        } catch (InterruptedException e) {
            worker.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 立即关闭执行器,尝试中断正在执行的任务
     */
    public void shutdownNow() {
        running = false;
        worker.shutdownNow();
    }

    // 内部任务包装类
    private static class TaskWrapper<T> {
        final Callable<T> task;
        final CompletableFuture<T> future;

        TaskWrapper(Callable<T> task, CompletableFuture<T> future) {
            this.task = task;
            this.future = future;
        }
    }
}

依赖要求

项目需要引入 Guava(提供 RateLimiter):

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>32.1.2-jre</version>
</dependency>

构造参数说明

参数类型说明
permitsPerSeconddouble每秒允许执行的任务数。例:1.0→每秒1次,0.5→每2秒1次
maxRetriesint最大重试次数(不含首次执行)。0表示不重试
initialDelayMslong首次重试前的等待时间(毫秒)
backoffMultiplierdouble退避乘数。2.0→每次延迟翻倍;1.0→固定延迟

核心方法

CompletableFuture<T> submit(Callable<T> task):提交任务,返回 CompletableFuture,可异步获取结果或异常。

void shutdown():优雅关闭:等待已提交任务执行完毕,不再接受新任务。

void shutdownNow():立即关闭:尝试中断当前执行的任务。

注意事项

执行器内部使用 单线程 处理任务,严格保证提交顺序

限流基于 RateLimiter,每次执行前阻塞直到获取令牌,因此即使任务执行时间极短,也能保证速率限制。

重试期间工作线程会阻塞等待,后续任务不会提前执行,顺序性得以保持。

工作线程默认设为守护线程,当所有用户线程结束时 JVM 会自动退出,无需手动关闭。但建议在应用关闭时调用 shutdown() 以确保任务完整执行。

实战示例:调用淘宝IP接口

假设淘宝IP接口地址为 http://ip.taobao.com/outGetIpInfo?ip={ip},我们需要:

Demo 代码

public class TaobaoIpDemo {
    public static void main(String[] args) throws Exception {
        // 创建限流执行器:每秒1次,重试3次,首次延迟1秒,指数退避2.0
        RateLimitedExecutor<String> executor = new RateLimitedExecutor<>(
            1.0,    // 每秒1次
            3,      // 重试3次
            1000,   // 首次延迟1秒
            2.0     // 指数退避
        );

        // 需要查询的IP列表
        String[] ips = {"8.8.8.8", "114.114.114.114", "223.5.5.5"};

        // 提交所有任务
        for (String ip : ips) {
            CompletableFuture<String> future = executor.submit(() -> queryIp(ip));

            // 异步处理结果
            future.thenAccept(result -> {
                System.out.println("IP: " + ip + ", 结果: " + result);
            }).exceptionally(ex -> {
                System.err.println("IP: " + ip + ", 查询失败: " + ex.getMessage());
                return null;
            });
        }

        // 等待所有任务完成(实际应用中不需要,这里仅演示)
        Thread.sleep(10000);

        // 优雅关闭
        executor.shutdown();
    }

    private static String queryIp(String ip) {
        String url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=" + ip;
        return RestClient.create()
                .get()
                .uri(url)
                .retrieve()
                .body(String.class);
    }
}

适用场景

总结

这个轻量级的限流任务执行器,代码简洁、功能完整,能帮你轻松解决速率控制 + 顺序执行 + 自动重试三大问题。配合 CompletableFuture 异步编程,性能与体验兼得。

如果你也在为API限流或任务重试头疼,不妨复制这份代码到项目中试试~

本文代码已脱敏,可放心复制到生产项目。Guava 版本建议使用 30.0 以上。

以上就是基于Java手写一个通用限流任务执行器的详细内容,更多关于Java限流任务执行器的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文