Java中的Fork/Join框架使用详解
作者:帅气的喵喵
这篇文章主要介绍了Java中的Fork/Join框架使用详解,Fork/Join 框架:就是在必要的情况下,将一个大任务,进行<BR>拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个<BR>的小任务运算的结果进行 join 汇总,需要的朋友可以参考下
意义
Fork/Join 框架:就是在必要的情况下,将一个大任务,进行 拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个 的小任务运算的结果进行 join 汇总
Fork/Join 框架与传统线程池的区别
采用 “工作窃取”模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行, 并将小任务加到线程队列中,然后再从一个随机线程的队 列中偷一个并把它放在自己的队列中。
相对于一般的线 程池实现,fork/join框架的优势体现在对其中包含的任务的 处理方式上.在一般的线程池中,如果一个线程正在执行的 任务由于某些原因无法继续运行,那么该线程会处于等待 状态、
而在fork/join框架实现中,如果某个子问题由于等 待另外一个子问题的完成而无法继续运行.那么处理该子 问题的线程会主动寻找其他尚未运行的子问题来执行.这 种方式减少了线程的等待时间,提高了性能.
使用
ForkJoinTask:我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。
该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类,
- RecursiveAction:用于没有返回结果的任务
- RecursiveTask:用于有返回结果的任务
- ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行
- RecursiveTask: 继承后可以实现递归(自己调自己)调用的任务
class MyTask extends RecursiveTask<Integer> { //拆分差值不能超过10,计算10以内运算 private static final Integer VALUE = 10; private int begin ;//拆分开始值 private int end;//拆分结束值 private int result ; //返回结果 //创建有参数构造 public MyTask(int begin,int end) { this.begin = begin; this.end = end; } //拆分和合并过程 @Override protected Integer compute() { //判断相加两个数值是否大于10 if((end-begin)<=VALUE) { //相加操作 for (int i = begin; i <=end; i++) { result = result+i; } } else {//进一步拆分 //获取中间值 int middle = begin+(end- begin)/2; //拆分左边 MyTask task01 = new MyTask(begin,middle); //拆分右边 MyTask task02 = new MyTask(middle+1,end); //调用方法拆分 task01.fork(); task02.fork(); //合并结果 result = task01.join()+task02.join(); } return result; } } public class ForkJoinDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { //创建MyTask对象 MyTask myTask = new MyTask(0,100); //创建分支合并池对象 ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask); //获取最终合并之后结果 Integer result = forkJoinTask.get(); System.out.println(result); //关闭池对象 forkJoinPool.shutdown(); } }
package com.yxj.java8; import java.util.concurrent.RecursiveTask; public class ForkJoinCalculate extends RecursiveTask<Long>{ /** * */ private static final long serialVersionUID = 13475679780L; private long start; private long end; private static final long THRESHOLD = 10000L; //临界值 public ForkJoinCalculate(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long length = end - start; if(length <= THRESHOLD){ long sum = 0; for (long i = start; i <= end; i++) { sum += i; } return sum; }else{ long middle = (start + end) / 2; ForkJoinCalculate left = new ForkJoinCalculate(start, middle); left.fork(); //拆分,并将该子任务压入线程队列 ForkJoinCalculate right = new ForkJoinCalculate(middle+1, end); right.fork(); return left.join() + right.join(); } } }
package com.yxj.java8; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; import org.junit.Test; public class TestForkJoin { @Test public void test1(){ long start = System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L); long sum = pool.invoke(task); System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("耗费的时间为: " + (end - start)); //112-1953-1988-2654-2647-20663-113808 } @Test public void test2(){ long start = System.currentTimeMillis(); long sum = 0L; for (long i = 0L; i <= 10000000000L; i++) { sum += i; } System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("耗费的时间为: " + (end - start)); //34-3174-3132-4227-4223-31583 } @Test public void test3(){ long start = System.currentTimeMillis(); Long sum = LongStream.rangeClosed(0L, 10000000000L) .parallel() .sum(); System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("耗费的时间为: " + (end - start)); //2061-2053-2086-18926 } }
到此这篇关于Java中的Fork/Join框架使用详解的文章就介绍到这了,更多相关Fork/Join框架内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!