Java中的CompletionService批量异步执行详解
作者:Java面试365
前景引入
我们知道线程池可以执行异步任务,同时可以通过返回值Future获取返回值,所以异步任务大多数采用ThreadPoolExecutor+Future,如果存在如下情况,需要从任务一二三中获取返回值后,保存到数据库中,用异步逻辑实现代码应该如下所示。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); Future<Integer> f1 = executorService.submit(() -> { System.out.println("执行任务一"); return 1; }); Future<Integer> f2 = executorService.submit(() -> { System.out.println("执行任务二"); return 2; }); Future<Integer> f3 = executorService.submit(() -> { System.out.println("执行任务三"); return 3; }); Integer r1 = f1.get(); executorService.execute(()->{ // 省略保存r1操作 System.out.println(r1); }); Integer r2 = f2.get(); executorService.execute(()->{ // 省略保存r2操作 System.out.println(r2); }); Integer r3 = f3.get(); executorService.execute(()->{ // 省略保存r3操作 System.out.println(r3); }); executorService.shutdown(); }
这样写的代码一点毛病没有,逻辑都是正常的,但如果存在任务一查询了比较耗时的操作,由于f1.get是阻塞执行,那么就算任务二和任务三已经返回结果,任务二的返回值和任务三的返回值都是不能保存到数据库的,因为f1.get将主线程阻塞了。
批量异步实现
那可以如何处理呢?可以采用万能的阻塞队列,任务先执行完毕的先入队,这样可以保证其它线程入库的速度不受影响,提高效率。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3); Future<Integer> f1 = executorService.submit(() -> { System.out.println("执行任务一"); Thread.sleep(5000); return 1; }); Future<Integer> f2 = executorService.submit(() -> { System.out.println("执行任务二"); return 2; }); Future<Integer> f3 = executorService.submit(() -> { System.out.println("执行任务三"); Thread.sleep(3000); return 3; }); executorService.execute(()->{ try { Integer r1 = f1.get(); // 阻塞队列入队操作 queue.put(r1); System.out.println(r1); } catch (Exception e) { e.printStackTrace(); } }); executorService.execute(()->{ try { Integer r2 = f2.get(); queue.put(r2); System.out.println(r2); } catch (Exception e) { e.printStackTrace(); } }); executorService.execute(()->{ try { Integer r3 = f3.get(); queue.put(r3); System.out.println(r3); } catch (Exception e) { e.printStackTrace(); } }); // 循环次数不要使用queue.size限制,因为不同时刻queue.size值是有可能不同的 for (int i = 0; i <3; i++) { Integer integer = queue.take(); // 省略保存integer操作 executorService.execute(()->{ System.out.println("保存入库=="+integer); }); } executorService.shutdown(); }
产生结果如下
同样的在生产中不建议使用,因为SDK为我们提供了工具类CompletionService,CompletionService内部就维护了一个阻塞队列,唯一与上述代码实现有所区别的是,阻塞队列入库的是Future对象,其余原理类似。
CompletionService
如何创建CompletionService
CompletionService同样是一个接口,其具体实现为ExecutorCompletionService,创建CompletionService对象有两种方式
public ExecutorCompletionService(Executor executor); public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)
CompletionService对象的创建都是需要指定线程池,如果在创建时没有传入阻塞对象,那么会采用默认的LinkedBlockingQueue无界阻塞队列,如果应用到生产可能会产生OOM的情况,这是需要注意的。
CompletionService初体验
CompletionService如何做到批量执行异步任务呢,将上述场景采用CompletionService实现下
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletionService completionService = new ExecutorCompletionService(executorService); Future<Integer> f1 = completionService.submit(() -> { System.out.println("执行任务一"); Thread.sleep(5000); return 1; }); Future<Integer> f2 = completionService.submit(() -> { System.out.println("执行任务二"); return 2; }); Future<Integer> f3 = completionService.submit(() -> { System.out.println("执行任务三"); Thread.sleep(3000); return 3; }); for (int i = 0; i <3 ; i++) { Future take = completionService.take(); Integer integer = (Integer) take.get(); executorService.execute(()->{ System.out.println("执行入库=="+integer); }); } executorService.shutdown(); }
CompletionService接口说明
CompletionService的方法不多,使用起来比较简单,方法签名如下
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletionService completionService = new ExecutorCompletionService(executorService); Future<Integer> f1 = completionService.submit(() -> { System.out.println("执行任务一"); Thread.sleep(5000); return 1; }); Future<Integer> f2 = completionService.submit(() -> { System.out.println("执行任务二"); return 2; }); Future<Integer> f3 = completionService.submit(() -> { System.out.println("执行任务三"); Thread.sleep(3000); return 3; }); for (int i = 0; i <3 ; i++) { Future take = completionService.take(); Integer integer = (Integer) take.get(); executorService.execute(()->{ System.out.println("执行入库=="+integer); }); } executorService.shutdown(); }
总结
CompletionService主要是去解决无效等待的问题,如果一个耗时较长的任务在执行,那么可以采用这种方式避免无效的等待
CompletionService还能让异步任务的执行结果有序化,先执行完就先进入阻塞队列。
到此这篇关于Java中的CompletionService批量异步执行详解的文章就介绍到这了,更多相关CompletionService批量异步执行内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!