java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java CompletionService并发编排

Java实现CompletionService并发编排消费任务

作者:NigulasiLiu

RocketMQ批量拉取消息,消费端一条一条串行处理导致耗时较高,为了解决这个问题,文章提出使用CompletionService来实现并发处理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

假如出现了这种情况:RocketMQ 批量拉取了 128 条消息,但消费端是一条一条串行处理的。128 条消息,每条 50 毫秒,一轮消费就要 6.4 秒。

可以直接用批量消费吗?

串行消费的瓶颈

RocketMQ 的 setConsumeMessageBatchMaxSize(128) 让你一次拉 128 条消息过来,但如果你还是逐条同步处理,那批量拉取的意义就只剩「减少网络往返」了。处理速度的瓶颈,一条都没解开。

直觉上的解法很简单,交给线程池并发执行嘛。但这里藏着一个很容易忽略的问题。

如果异步线程执行失败了,RocketMQ 的 Broker 是不知道的。主线程已经返回了 CONSUME_SUCCESS,Broker 提交了偏移量,那条失败的消息就丢了。或者更常见的情况,主线程根本不知道哪些子线程成功了、哪些失败了,只能盲目地返回成功或重试。

我们需要一个办法,在主线程中感知每一个子线程的执行结果。全部成功才返回 CONSUME_SUCCESS,有一条失败就返回 RECONSUME_LATER 让 RocketMQ 整体重发。

这不是「能不能并发」的问题,是「并发了能不能控」的问题。

CompletionService,先完成先取结果的编排器

Java 并发包里有一个接口叫 CompletionService,位于 java.util.concurrent,专门干这件事,批量提交异步任务,按完成顺序逐个取结果。

它的唯一实现类是 ExecutorCompletionService,用法就三步。

  1. 提交所有任务到线程池。
  2. 循环取结果,发现失败立即标记。
  3. 全部成功返回 CONSUME_SUCCESS,否则返回 RECONSUME_LATER

代码逻辑如下。

@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
    consumer.setPullInterval(1000);
    consumer.setConsumeMessageBatchMaxSize(128);
    consumer.setPullBatchSize(64);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        log.info("NewBuyBatchMsgListener receive message size: {}", msgs.size());

        CompletionService<Boolean> completionService = new ExecutorCompletionService<>(executor);
        List<Future<Boolean>> futures = new ArrayList<>();

        // 1. 提交所有任务
        msgs.forEach(messageExt -> {
            Callable<Boolean> task = () -> {
                try {
                    OrderCreateRequest orderCreateRequest = JSON.parseObject(JSON.parseObject(messageExt.getBody()).getString("body"), OrderCreateRequest.class);
                    return doNewBuyExecute(orderCreateRequest);
                } catch (Exception e) {
                    log.error("Task failed", e);
                    return false; // 标记失败
                }
            };
            futures.add(completionService.submit(task));
        });

        // 2. 检查结果
        boolean allSuccess = true;
        try {
            for (int i = 0; i < msgs.size(); i++) {
                Future<Boolean> future = completionService.take();
                if (!future.get()) { // 3. 发现一个失败立即终止
                    allSuccess = false;
                    break;
                }
            }
        } catch (Exception e) {
            allSuccess = false;
        }

        // 3. 根据结果返回消费状态
        return allSuccess ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                : ConsumeConcurrentlyStatus.RECONSUME_LATER;
    });
}

128 条消息并发处理,总耗时从 6.4 秒降到最慢那条的耗时,通常几十毫秒就够了。

那么,「CompletionService 底层是怎么做到先完成先取的?」

底层原理,BlockingQueue + QueueingFuture

ExecutorCompletionService 的源码非常精炼,核心就三个成员变量。

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;

executor 是你传入的线程池,completionQueue 是一个 LinkedBlockingQueue,用来存放已完成任务的 Future 对象。

关键在 submit 方法里。当你调用 completionService.submit(task) 时,它并没有直接把 task 丢给线程池,而是先包装了一层。

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

QueueingFuture 继承自 FutureTask,重写了 done() 方法。done()FutureTask 提供的钩子,任务无论正常完成还是异常终止,都会回调这个方法。

所以整个流程是这样的。你 submit 一个任务,它被包装成 QueueingFuture 交给线程池执行。任务跑完的那一刻,done() 触发,把对应的 Future 塞进 completionQueue。你调 take(),就是从 completionQueue 里阻塞地取一个出来。

谁先完成,谁的 Future 先入队,你就先取到谁的结果。提交顺序和完成顺序解耦了。

我觉得这个设计很漂亮。它没有用任何锁排序、没有用优先队列、没有用回调链,就是最朴素的「生产者往队列里放,消费者从队列里取」。BlockingQueue 天然线程安全,生产消费解耦,简单到几乎不可能出错。

那 CompletableFuture 呢?

Java 8 引入了 CompletableFuture,同样是处理异步任务的利器。它和 CompletionService 解决的问题有重叠,但设计哲学完全不同。

维度CompletionServiceCompletableFuture
引入版本Java 5Java 8
核心机制BlockingQueue,先完成先取回调链,任务间可编排依赖
结果获取take() 阻塞等待下一个完成thenApply() / thenCompose() 非阻塞回调
任务关系批量独立任务,互不依赖可描述 A 完成后执行 B、A 和 B 都完成后执行 C
异常处理在 Future.get() 时抛 ExecutionExceptionexceptionally() / handle() 流式处理
适用场景批量同构任务,只关心结果是否全部成功异步流程编排,任务间有依赖和组合关系

一句话总结,CompletionService 是「批量并发,按完成顺序收结果」,CompletableFuture 是「异步编排,按依赖关系串流程」。

回到我们这个 RocketMQ 批量消费的场景,128 条消息之间没有任何依赖关系,我们只关心「全部成功还是有一个失败」。这就是 CompletionService 的主场。

如果你用 CompletableFuture 来写,也能做,但你需要自己维护一个 CompletableFuture.allOf() 来等全部完成,然后再遍历检查结果。代码更啰嗦,而且 allOf 会等所有任务都完成才能继续,哪怕第 2 条消息就失败了,你也要等剩下 126 条跑完才能返回。CompletionServicetake() 则是逐个检查,发现失败立即 break,省下了不必要的等待。

当然,如果你的场景是「查商品信息,再根据商品查库存和价格,最后组装结果」,任务之间有明确的先后依赖,那 CompletableFuture 的链式编排就比 CompletionService 的队列取值优雅得多。

工具没有好坏,只有合不合适。

并发不是目的,可控才是

串行消费慢,直觉反应是加并发。但加了并发之后,如果主线程无法感知子线程的成败,那并发就不是加速,是埋雷。消息丢了都不知道。

CompletionService 解决的不是「怎么并发」的问题,而是「并发了怎么收场」的问题。它用最朴素的 BlockingQueue 机制,让主线程能按完成顺序逐个检查结果,发现异常立即止损。

我觉得并发编程最难的部分从来不是「怎么让任务跑起来」,而是「跑起来之后怎么确保结果可控」。CompletionService 给了一个很干净的答案。

到此这篇关于Java实现CompletionService并发编排消费任务的文章就介绍到这了,更多相关Java CompletionService并发编排内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文