Java ForkJoinPool线程池的使用之并行计算数组求和实例
作者:学亮编程手记
这篇文章主要介绍了Java ForkJoinPool线程池的使用之并行计算数组求和实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
Java ForkJoinPool线程池的使用之并行计算数组求和
package com.zhangxueliang.juc;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
public class ForkJoinPoolDemo {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100);
}
System.out.println("---" + Arrays.stream(nums).sum()); //stream api
}
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected void compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
static class AddTaskRet extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
int start, end;
AddTaskRet(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
int middle = start + (end-start)/2;
AddTaskRet subTask1 = new AddTaskRet(start, middle);
AddTaskRet subTask2 = new AddTaskRet(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
public static void main(String[] args) throws IOException {
/*ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
fjp.execute(task);*/
ForkJoinPoolDemo temp = new ForkJoinPoolDemo();
ForkJoinPool fjp = new ForkJoinPool();
AddTaskRet task = new AddTaskRet(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println(result);
//System.in.read();
}
}
ForkJoinPool 示例代码解析
这段代码演示了 Java 中 ForkJoinPool 框架的使用,展示了两种不同的任务分割方式:
RecursiveAction(无返回值)RecursiveTask(有返回值)
代码结构分析
1. 初始化部分
static int[] nums = new int[1000000]; // 创建包含100万个元素的数组
static final int MAX_NUM = 50000; // 任务分割的阈值
static Random r = new Random(); // 随机数生成器
// 静态初始化块:填充数组并计算总和
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100); // 每个元素赋值为0-99的随机数
}
System.out.println("---" + Arrays.stream(nums).sum()); // 使用stream API计算总和作为验证基准
}2. RecursiveAction 实现(无返回值)
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected void compute() {
if(end-start <= MAX_NUM) { // 如果任务足够小,直接计算
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else { // 否则分割任务
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork(); // 异步执行子任务
subTask2.fork();
}
}
}3. RecursiveTask 实现(有返回值)
static class AddTaskRet extends RecursiveTask<Long> {
int start, end;
AddTaskRet(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if(end-start <= MAX_NUM) { // 如果任务足够小,直接计算并返回结果
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
// 分割任务
int middle = start + (end-start)/2;
AddTaskRet subTask1 = new AddTaskRet(start, middle);
AddTaskRet subTask2 = new AddTaskRet(middle, end);
subTask1.fork(); // 异步执行子任务
subTask2.fork();
return subTask1.join() + subTask2.join(); // 合并子任务结果
}
}4. 主方法
public static void main(String[] args) throws IOException {
// 创建ForkJoinPool实例
ForkJoinPool fjp = new ForkJoinPool();
// 创建有返回值的任务
AddTaskRet task = new AddTaskRet(0, nums.length);
// 执行任务
fjp.execute(task);
// 获取并打印结果
long result = task.join();
System.out.println(result);
}关键概念解释
ForkJoinPool:
- Java 7引入的线程池实现
- 使用工作窃取(work-stealing)算法提高并行效率
- 特别适合分治(divide-and-conquer)算法
RecursiveAction:
- 用于不返回结果的任务
- 需要实现
compute()方法 - 示例中的
AddTask只打印结果不返回
RecursiveTask:
- 用于需要返回结果的任务
- 需要实现
compute()方法并返回指定类型 - 示例中的
AddTaskRet返回子数组的和
fork()和join():
fork(): 异步安排任务执行join(): 等待任务完成并获取结果
执行流程
- 初始化一个包含100万个随机数的数组
- 使用Stream API计算总和作为基准
- 创建ForkJoinPool
- 创建AddTaskRet任务,范围是整个数组
- 任务会根据MAX_NUM阈值(50000)不断分割,直到足够小
- 小任务直接计算子数组和
- 合并所有子任务的结果得到最终总和
- 打印结果(应与Stream API计算的结果一致)
使用建议
- 对于计算密集型任务,ForkJoinPool通常比传统线程池更高效
- 任务分割的阈值需要合理设置,太小会导致过多任务创建开销,太大会降低并行度
- 有返回结果需求时使用RecursiveTask,否则使用RecursiveAction
- 注意join()是阻塞调用,会等待任务完成
这段代码很好地展示了ForkJoin框架的分治思想和使用方法,是并行计算数组求和的经典示例。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
