java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java使用ForkJoinPool

Java中使用ForkJoinPool的实现示例

作者:BillDavidup

ForkJoinPool是一个功能强大的Java类,用于处理计算密集型任务,本文主要介绍了Java中使用ForkJoinPool的实现示例,具有一定的参考价值,感兴趣的可以了解一下

背景

使用ForkJoinPool去分解计算密集型任务且且并行地执行他们以获得更好的Java应用程序的性能。

ForkJoinPool是一个功能强大的Java类,用于处理计算密集型任务。它的工作原理是将任务分解为更小的子任务,然后并行执行。该线程池使用分而治之策略进行操作,使其能够并发执行任务,从而提高吞吐量并减少处理时间。

ForkJoinPool的一个独特的特点是它的工作窃取算法,用它来优化性能。当工作线程完成其分配的任务时,它将从其他线程窃取任务,确保所有线程都能高效工作,并且不会浪费计算机资源。

ForkJoinPool广泛用于Java的并行Stream流和CompletableFutures,使开发人员能够轻松地同时执行任务。此外,其他JVM语言(如Kotlin和Akka)使用此框架来构建需要高并发性和弹性的消息驱动应用程序。

1.使用ForkJoinPool的线程池

ForkJoinPool 类存储工作线程Worker,Worker是在机器的每个CPU核心上运行的进程。每个进程都存储在deque中,deque代表双端队列。一旦工作线程的任务用完,它就开始从其他工作线程窃取任务。

首先,将有一个分叉任务的过程;这意味着一个大任务将被分解为可以并行执行的较小任务。所有子任务完成后,它们将重新加入。然后ForkJoinPool类提供一个结果,如图1所示。

当任务被提交到 ForkJoinPool 中,该进程将被划分为较小的进程,并推送到共享队列中。

一旦调用了fork()方法,就会并行调用任务,直到基本条件为true。一旦处理被分叉,join()方法确保线程相互等待,直到进程完成。

最初,所有任务都将提交到一个主队列,该主队列将把任务推送到工作线程。请注意,任务是使用LIFO(后进先出)策略插入的,该策略与堆栈数据结构相同。

另一个重要的点是ForkJoinPool使用deques来存储任务。这提供了使用后进先出或先进先出(先进先出)的能力,这对于窃取工作的算法是必要的。

2.工作窃取算法

ForkJoinPool中的工作窃取是一种有效的算法,通过平衡池中所有可用线程的工作负载,可以有效地使用计算机资源。

当一个线程变为空闲时,它将尝试从仍忙于分配工作的其他线程中窃取任务,而不是保持非活动状态。这个过程最大限度地利用了计算资源,并确保没有线程负担过重,而其他线程保持空闲。工作窃取算法背后的关键概念是,每个线程都有自己的任务组,并按后进先出的顺序执行。当一个线程完成自己的任务并变为空闲时,它将尝试从另一个线程的deque的末尾“窃取”任务,遵循FIFO策略,与队列数据结构相同。这允许空闲线程拾取等待时间最长的任务,从而减少总体等待时间并提高吞吐量。

在下图中,线程2通过轮询线程1的deque中的最后一个元素,从线程1中窃取一个任务,然后执行该任务。被盗任务通常是deque中最古老的任务,这确保了工作负载在池中的所有线程之间均匀分布。

总的来说,ForkJoinPool的工作窃取算法是一个强大的功能,它可以通过确保所有可用的计算资源都得到有效利用来显著提高并行应用程序的性能。

3.ForkJoinPool的主要类

让我们快速了解一下支持使用ForkJoinPool进行处理的主要类。

4.使用递归操作

要使用RecursiveAction功能,我们需要继承它并重写compute()方法。然后,我们用想要实现的逻辑创建子任务。

在下面的代码示例中,我们将以并行和递归的方式计算数组中每个数字的两倍。我们被限制为并行计算二乘二的数组元素。

正如您所看到的,fork()方法调用compute()方法。一旦整个数组的每个元素都得到了和,递归调用就会停止。一旦递归地求和了数组的所有元素,我们就会显示结果。

Listing 1. An example of RecursiveAction

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class ForkJoinDoubleAction {
  public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    int[] array = {1, 5, 10, 15, 20, 25, 50};
    DoubleNumber doubleNumberTask = new DoubleNumber(array, 0, array.length);
    // Invokes compute method
    forkJoinPool.invoke(doubleNumberTask);
    System.out.println(DoubleNumber.result);
  }
}
class DoubleNumber extends RecursiveAction {
  final int PROCESS_THRESHOLD = 2;
  int[] array;
  int startIndex, endIndex;
  static int result;
  DoubleNumber(int[] array, int startIndex, int endIndex) {
    this.array = array;
    this.startIndex = startIndex;
    this.endIndex = endIndex;
  }
  @Override
  protected void compute() {
    if (endIndex - startIndex <= PROCESS_THRESHOLD) {
      for (int i = startIndex; i < endIndex; i++) {
        result += array[i] * 2;
      }
    } else {
      int mid = (startIndex + endIndex) / 2;
      DoubleNumber leftArray = new DoubleNumber(array, startIndex, mid);
      DoubleNumber rightArray = new DoubleNumber(array, mid, endIndex);
      // Invokes the compute method recursively
      leftArray.fork();
      rightArray.fork();
      // Joins results from recursive invocations
      leftArray.join();
      rightArray.join();
    }
  }
}

该计算的输出为252。

RecursiveAction需要记住的一点是,它不会返回值。通过使用分而治之的策略来提高性能,也可以打破这个过程。

这就是我们在清单1中所做的,我们不是计算每个数组元素的二重,而是通过将数组分解为多个部分来并行计算。

同样重要的是要注意,RecursiveAction在用于可以有效分解为较小子问题的任务时最有效。因此,RecursiveAction和ForkJoinPool应该用于计算密集型任务,在这些任务中,工作的并行化可以显著提高性能。否则,由于线程的创建和管理,性能将更差。

5.资源任务

在下一个示例中,让我们研究一个简单的程序,它递归地在中间中断,直到达到基本条件。在本例中,我们使用的是RecursiveTask类。RecursiveAction和RecursiveTask的区别在于,使用递归任务,我们可以在compute()方法中返回一个值。Listing 2. An example of RecursiveTask

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinSumArrayTask extends RecursiveTask<Integer> {
  private final List<Integer> numbers;
  public ForkJoinSumArrayTask(List<Integer> numbers) {
    this.numbers = numbers;
  }
  @Override
  protected Integer compute() {
    if (numbers.size() <= 2) {
      return numbers.stream().mapToInt(e -> e).sum();
    } else {
      int mid = numbers.size() / 2;
      List<Integer> list1 = numbers.subList(0, mid);
      List<Integer> list2 = numbers.subList(mid, numbers.size());
      ForkJoinSumArrayTask task1 = new ForkJoinSumArrayTask(list1);
      ForkJoinSumArrayTask task2 = new ForkJoinSumArrayTask(list2);
      task1.fork();
      return task1.join() + task2.compute();
    }
  }
  public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    List<Integer> numbers = List.of(1, 3, 5, 7, 9);
    int output = forkJoinPool.invoke(new ForkJoinSumArrayTask(numbers));
    System.out.println(output);
  }
}

在这里,我们递归地分解中间的数组,直到它达到基本条件。

一旦我们破坏了主数组,我们将list1和list2发送到ForkJoinsMarrayTask,然后我们分叉task1,它并行执行compute()方法和数组的其他部分。

一旦递归过程达到基本条件,就会调用连接方法,连接结果。这种情况下的输出为25。

6.何时使用ForkJoinPool

ForkJoinPool不应该在所有情况下都使用。如前所述,最好将其用于高度密集的并发进程。让我们具体看看这些情况是什么:

7.总结

在本文中,您了解了如何使用最重要的ForkJoinPool功能在单独的CPU核心中执行繁重的操作。让我们以这篇文章的要点作为结论:

到此这篇关于Java中使用ForkJoinPool的实现示例的文章就介绍到这了,更多相关Java使用ForkJoinPool内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文