java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java线程池并发度控制

Java滑动窗口实现线程池并发度控制的方法详解

作者:桦说编程

Java并发线程池是一种用于管理和复用线程的机制,它简化了线程的生命周期管理,提高了资源利用率和程序响应速度,这篇文章主要介绍了Java滑动窗口实现线程池并发度控制的相关资料,需要的朋友可以参考下

前言

本文深入解析了如何使用 ExecutorCompletionService 和 Guava ListenableFuture 实现并发度可控的任务执行器,帮助你掌握生产级并发编程的核心技巧。

问题背景

在高并发场景下,我们可能面临这样的需求(仅为例子):

场景:火车票查询系统,用户输入北京到上海,需要查询100个车次的余票信息。

需求

传统方案的痛点:

方案问题
ExecutorService.invokeAll()阻塞等待所有任务完成,无法立即返回
CompletableFuture.allOf()无法控制并发度,100个请求同时发出触发限流
自己用 Semaphore代码复杂,需要手动管理信号量和异常
分批执行(10个一批)任务执行时间不均匀,短任务等待长任务,资源利用率低

优化方案:滑动窗口 - 初始提交10个,每完成一个立即补充下一个,保持10个并发。

代码实现

/**
 * 并发限制执行器
 * <p>
 * 核心功能:控制任务并发度,采用滑动窗口策略
 * <p>
 * 知识点:
 * 1. ExecutorCompletionService - 按完成顺序获取任务结果
 * 2. ListenableFuture - Guava 可监听 Future
 * 3. SettableFuture - 可手动设置结果的 Future
 * 4. 滑动窗口并发控制 - 初始提交N个,每完成一个立即补充下一个
 *
 * @author 桦说编程
 */
@Value
public class ConcurrentLimitExecutor<V> {
    ListeningExecutorService pool;
    int parallelism;
    BlockingQueue<Future<V>> q;
    ExecutorCompletionService<V> cs = new ExecutorCompletionService<>(pool, q);
    ListeningExecutorService submitter = MoreExecutors.listeningDecorator(
            Executors.newSingleThreadExecutor());
    /**
     * 提交所有任务,立即返回Future列表
     *
     * @param tasks 待执行的任务列表
     * @return Future列表,顺序与tasks一致
     */
    @SuppressWarnings("unchecked")
    public List<ListenableFuture<V>> submitAll(List<Callable<V>> tasks) {
        if (tasks.isEmpty()) {
            return ImmutableList.of();
        }
        // 创建结果占位符,数量等于任务数量
        List<SettableFuture<V>> result = IntStream.range(0, tasks.size())
                .mapToObj(__ -> SettableFuture.<V>create())
                .collect(toImmutableList());
        // 首批提交数量
        int start = Math.min(tasks.size(), parallelism);
        // 提交首批任务
        for (int i = 0; i < start; i++) {
            ListenableFuture<V> f = (ListenableFuture<V>) cs.submit(tasks.get(i));
            linkFuture(f, result.get(i));
        }
        // 异步提交剩余任务
        submitter.submit(() -> submitRemaining(tasks, result, start));
        return (List<ListenableFuture<V>>) (List<?>) result;
    }
    /**
     * 提交剩余任务(在独立线程中执行)
     * <p>
     * 流程:
     * 1. 等待任意任务完成(cs.take())
     * 2. 提交下一个任务
     * 3. 将新任务的Future链接到result对应位置
     * 4. 重复直到所有任务提交完
     */
    @SneakyThrows
    private void submitRemaining(List<Callable<V>> tasks,
                                 List<SettableFuture<V>> result,
                                 int start) {
        int index = start;
        int size = tasks.size();
        while (index < size) {
            // 阻塞等待任意任务完成(释放一个并发槽位)
            cs.take();
            // 提交下一个任务
            ListenableFuture<V> f = (ListenableFuture<V>) cs.submit(tasks.get(index));
            // 链接结果到对应位置
            linkFuture(f, result.get(index));
            index++;
        }
    }
    private void linkFuture(ListenableFuture<V> from, SettableFuture<V> to) {
        to.setFuture(from);
    }
}

关键设计点

设计元素说明
result 数量等于 tasks.size(),每个任务都有对应的 Future
linkFuture将 cs 返回的 Future 链接到 result 中对应位置
异步补充使用 submitter 线程池异步提交剩余任务,不阻塞调用方
立即返回submitAll() 立即返回,上层可用 Futures.allAsList() 组合

核心知识点

1. ExecutorCompletionService

JDK提供的任务完成服务,核心能力是按完成顺序获取结果

ExecutorService pool = Executors.newFixedThreadPool(10);
ExecutorCompletionService<Train> cs = new ExecutorCompletionService<>(pool);
// 提交任务
cs.submit(() -> queryTrain("G1"));
cs.submit(() -> queryTrain("G2"));
// 阻塞获取最先完成的结果(而不是提交顺序)
Future<Train> first = cs.take();  // 谁先完成就拿到谁
Future<Train> second = cs.take();

原理

┌─────────────────────────────────────────────┐
│      ExecutorCompletionService              │
├─────────────────────────────────────────────┤
│  - executor: ExecutorService                │
│  - completionQueue: BlockingQueue<Future>   │
├─────────────────────────────────────────────┤
│  + submit(Callable): Future                 │
│  + take(): Future  // 阻塞获取已完成的      │
│  + poll(): Future  // 非阻塞获取            │
└─────────────────────────────────────────────┘
         │
         │ 任务完成时自动放入队列
         ▼
┌─────────────────────────────────────────────┐
│     BlockingQueue<Future<V>>                │
│  ┌──────┐  ┌──────┐  ┌──────┐              │
│  │Future│  │Future│  │Future│  ...         │
│  └──────┘  └──────┘  └──────┘              │
│   已完成    已完成    已完成                 │
└─────────────────────────────────────────────┘

关键点

2. Guava ListenableFuture

标准Future的增强版,支持回调监听

// 标准Future:只能轮询或阻塞
Future<Train> future = executor.submit(() -> queryTrain("G1"));
Train result = future.get(); // 阻塞
// ListenableFuture:注册回调
ListeningExecutorService executor = MoreExecutors.listeningDecorator(pool);
ListenableFuture<Train> future = executor.submit(() -> queryTrain("G1"));
// 异步回调(不阻塞)
future.addListener(() -> {
    System.out.println("G1查询完成!");
}, directExecutor());
// 或使用Futures工具类
Futures.addCallback(future, new FutureCallback<Train>() {
    public void onSuccess(Train result) { ... }
    public void onFailure(Throwable t) { ... }
}, executor);

核心价值

3. SettableFuture

可手动设置结果的Future,类似CompletableFuture

SettableFuture<Train> future = SettableFuture.create();
// 在其他线程设置结果
future.set(train);                       // 正常完成
future.setException(new TimeoutException());  // 异常完成
future.setFuture(otherFuture);           // 链接另一个Future
// 调用方正常使用
Train result = future.get();

4. 并发度控制核心思路

滑动窗口策略

假设 parallelism=3, tasks=10(查询10个车次)
初始状态:提交前3个车次到线程池
┌───┬───┬───┐ ┌───┬───┬───┬───┬───┬───┬───┐
│ G1│ G2│ G3│ │ G4│ G5│ G6│ G7│ G8│ G9│G10│
└───┴───┴───┘ └───┴───┴───┴───┴───┴───┴───┘
  执行中          等待中
G1 完成 → 立即提交 G4
┌───┬───┬───┐ ┌───┬───┬───┬───┬───┬───┐
│   │ G2│ G3│ │ G4│ G5│ G6│ G7│ G8│ G9│G10│
└───┴───┴───┘ └───┴───┴───┴───┴───┴───┴───┘
      执行中        等待中
G2 完成 → 立即提交 G5
...
维持窗口大小 = parallelism,任务完成后立即补充新任务

实现要点

使用场景

场景说明
批量查询接口火车票、机票、酒店查询,下游限流保护
批量缓存查询Redis 连接池有限,控制并发数
批量 IO 操作文件读写、数据库查询
爬虫系统限制爬取速率,避免被封

总结

核心优势

到此这篇关于Java滑动窗口实现线程池并发度控制的文章就介绍到这了,更多相关Java线程池并发度控制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文