java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java ForkJoinPool线程池的使用之并行计算数组求和

Java ForkJoinPool线程池的使用之并行计算数组求和实例

作者:学亮编程手记

这篇文章主要介绍了Java ForkJoinPool线程池的使用之并行计算数组求和实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

Java ForkJoinPool线程池的使用之并行计算数组求和

package com.zhangxueliang.juc;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class ForkJoinPoolDemo {
	static int[] nums = new int[1000000];
	static final int MAX_NUM = 50000;
	static Random r = new Random();
	
	static {
		for(int i=0; i<nums.length; i++) {
			nums[i] = r.nextInt(100);
		}
		
		System.out.println("---" + Arrays.stream(nums).sum()); //stream api
	}
	

	static class AddTask extends RecursiveAction {

		int start, end;

		AddTask(int s, int e) {
			start = s;
			end = e;
		}

		@Override
		protected void compute() {

			if(end-start <= MAX_NUM) {
				long sum = 0L;
				for(int i=start; i<end; i++) sum += nums[i];
				System.out.println("from:" + start + " to:" + end + " = " + sum);
			} else {

				int middle = start + (end-start)/2;

				AddTask subTask1 = new AddTask(start, middle);
				AddTask subTask2 = new AddTask(middle, end);
				subTask1.fork();
				subTask2.fork();
			}


		}

	}

	
	static class AddTaskRet extends RecursiveTask<Long> {
		
		private static final long serialVersionUID = 1L;
		int start, end;
		
		AddTaskRet(int s, int e) {
			start = s;
			end = e;
		}

		@Override
		protected Long compute() {
			
			if(end-start <= MAX_NUM) {
				long sum = 0L;
				for(int i=start; i<end; i++) sum += nums[i];
				return sum;
			} 
			
			int middle = start + (end-start)/2;
			
			AddTaskRet subTask1 = new AddTaskRet(start, middle);
			AddTaskRet subTask2 = new AddTaskRet(middle, end);
			subTask1.fork();
			subTask2.fork();
			
			return subTask1.join() + subTask2.join();
		}
		
	}
	
	public static void main(String[] args) throws IOException {
		/*ForkJoinPool fjp = new ForkJoinPool();
		AddTask task = new AddTask(0, nums.length);
		fjp.execute(task);*/

		ForkJoinPoolDemo temp = new ForkJoinPoolDemo();

		ForkJoinPool fjp = new ForkJoinPool();
		AddTaskRet task = new AddTaskRet(0, nums.length);
		fjp.execute(task);
		long result = task.join();
		System.out.println(result);
		
		//System.in.read();
		
	}
}

ForkJoinPool 示例代码解析

这段代码演示了 Java 中 ForkJoinPool 框架的使用,展示了两种不同的任务分割方式:

代码结构分析

1. 初始化部分

static int[] nums = new int[1000000];  // 创建包含100万个元素的数组
static final int MAX_NUM = 50000;     // 任务分割的阈值
static Random r = new Random();        // 随机数生成器

// 静态初始化块:填充数组并计算总和
static {
    for(int i=0; i<nums.length; i++) {
        nums[i] = r.nextInt(100);  // 每个元素赋值为0-99的随机数
    }
    System.out.println("---" + Arrays.stream(nums).sum()); // 使用stream API计算总和作为验证基准
}

2. RecursiveAction 实现(无返回值)

static class AddTask extends RecursiveAction {
    int start, end;
    
    AddTask(int s, int e) {
        start = s;
        end = e;
    }

    @Override
    protected void compute() {
        if(end-start <= MAX_NUM) {  // 如果任务足够小,直接计算
            long sum = 0L;
            for(int i=start; i<end; i++) sum += nums[i];
            System.out.println("from:" + start + " to:" + end + " = " + sum);
        } else {  // 否则分割任务
            int middle = start + (end-start)/2;
            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();  // 异步执行子任务
            subTask2.fork();
        }
    }
}

3. RecursiveTask 实现(有返回值)

static class AddTaskRet extends RecursiveTask<Long> {
    int start, end;
    
    AddTaskRet(int s, int e) {
        start = s;
        end = e;
    }

    @Override
    protected Long compute() {
        if(end-start <= MAX_NUM) {  // 如果任务足够小,直接计算并返回结果
            long sum = 0L;
            for(int i=start; i<end; i++) sum += nums[i];
            return sum;
        } 
        
        // 分割任务
        int middle = start + (end-start)/2;
        AddTaskRet subTask1 = new AddTaskRet(start, middle);
        AddTaskRet subTask2 = new AddTaskRet(middle, end);
        subTask1.fork();  // 异步执行子任务
        subTask2.fork();
        
        return subTask1.join() + subTask2.join();  // 合并子任务结果
    }
}

4. 主方法

public static void main(String[] args) throws IOException {
    // 创建ForkJoinPool实例
    ForkJoinPool fjp = new ForkJoinPool();
    
    // 创建有返回值的任务
    AddTaskRet task = new AddTaskRet(0, nums.length);
    
    // 执行任务
    fjp.execute(task);
    
    // 获取并打印结果
    long result = task.join();
    System.out.println(result);
}

关键概念解释

ForkJoinPool:

RecursiveAction:

RecursiveTask:

fork()和join():

执行流程

  1. 初始化一个包含100万个随机数的数组
  2. 使用Stream API计算总和作为基准
  3. 创建ForkJoinPool
  4. 创建AddTaskRet任务,范围是整个数组
  5. 任务会根据MAX_NUM阈值(50000)不断分割,直到足够小
  6. 小任务直接计算子数组和
  7. 合并所有子任务的结果得到最终总和
  8. 打印结果(应与Stream API计算的结果一致)

使用建议

  1. 对于计算密集型任务,ForkJoinPool通常比传统线程池更高效
  2. 任务分割的阈值需要合理设置,太小会导致过多任务创建开销,太大会降低并行度
  3. 有返回结果需求时使用RecursiveTask,否则使用RecursiveAction
  4. 注意join()是阻塞调用,会等待任务完成

这段代码很好地展示了ForkJoin框架的分治思想和使用方法,是并行计算数组求和的经典示例。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文