java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java Stream API数据并行处理

Java Stream API中实现数据的并行处理指南

作者:搜罗万相

在 Java Stream API 中,实现数据的并行处理非常简单,核心是通过 ​​parallelStream() ​​​ 方法获取并行流,而非默认的串行流(​​stream()​​),本文给大家介绍了Java Stream API中实现数据的并行处理的操作方法,需要的朋友可以参考下

引言

在 Java Stream API 中,实现数据的并行处理非常简单,核心是通过 ​​parallelStream() ​​​ 方法获取并行流,而非默认的串行流(​​stream()​​)。并行流会自动利用多核 CPU 的优势,将数据分成多个子任务并行执行,从而提升大数据量处理的效率。

一、并行处理的核心原理

二、实现并行处理的步骤

  1. 获取并行流:通过集合的 ​​parallelStream()​​ 方法(或流的 ​​parallel()​​ 方法将串行流转为并行流)。
  2. 执行流操作:与串行流相同的链式操作(过滤、映射、聚合等),底层会自动并行执行。

三、示例代码

1. 基础并行处理(对比串行与并行)

import java.util.Arrays;
import java.util.List;

public class ParallelStreamDemo {
    public static void main(String[] args) {
        // 准备一个大数据量的集合(1000万个整数)
        List<Integer> numbers = Arrays.asList(new Integer[10_000_000]);
        for (int i = 0; i < numbers.size(); i++) {
            numbers.set(i, i);
        }

        // 串行流处理:计算偶数之和
        long start = System.currentTimeMillis();
        long serialSum = numbers.stream()
                .filter(n -> n % 2 == 0)
                .mapToLong(n -> n)
                .sum();
        long serialTime = System.currentTimeMillis() - start;
        System.out.println("串行处理结果:" + serialSum + ",耗时:" + serialTime + "ms");

        // 并行流处理:同样计算偶数之和
        start = System.currentTimeMillis();
        long parallelSum = numbers.parallelStream() // 关键:使用parallelStream()
                .filter(n -> n % 2 == 0)
                .mapToLong(n -> n)
                .sum();
        long parallelTime = System.currentTimeMillis() - start;
        System.out.println("并行处理结果:" + parallelSum + ",耗时:" + parallelTime + "ms");
    }
}

输出(示例)

串行处理结果:24999995000000,耗时:120ms  
并行处理结果:24999995000000,耗时:35ms  // 并行效率更高(依赖CPU核心数)

2. 将串行流转为并行流(​​parallel()​​ 方法)

除了直接使用 ​​parallelStream()​​,还可以通过 ​​parallel()​​ 方法将串行流转换为并行流:

List<String> words = Arrays.asList("apple", "banana", "cherry", "date");

// 串行流 → 转为并行流
long count = words.stream()
        .parallel() // 切换为并行处理
        .filter(word -> word.length() > 5)
        .count();
System.out.println("长度大于5的单词数:" + count); // 输出:2(banana、cherry)

四、注意事项

  1. 线程安全问题
    并行流会多线程执行操作,若流操作中涉及共享变量的修改(如使用 forEach 累加全局变量),可能导致线程安全问题。
    ❌ 错误示例(共享变量不安全):
int[] sum = {0}; // 共享数组
numbers.parallelStream()
       .forEach(n -> sum[0] += n); // 多线程修改sum[0],结果可能不正确

✅ 正确方式(使用线程安全的聚合操作):

long sum = numbers.parallelStream()
                 .mapToLong(n -> n)
                 .sum(); // sum() 内部线程安全
  1. 并非所有场景都适合并行
  1. 自定义并行线程池
    并行流默认使用 Fork/Join 框架的公共线程池(ForkJoinPool.commonPool()),若需自定义线程池,可通过 ForkJoinPool 包装:
import java.util.concurrent.ForkJoinPool;

ForkJoinPool pool = new ForkJoinPool(4); // 自定义4个核心线程的线程池
long sum = pool.submit(() -> 
    numbers.parallelStream()
           .filter(n -> n % 2 == 0)
           .mapToLong(n -> n)
           .sum()
).get(); // 阻塞获取结果
pool.shutdown(); // 关闭线程池

五、总结

合理使用并行流能显著优化数据处理性能,但需根据具体场景评估是否适用。

到此这篇关于Java Stream API中实现数据的并行处理指南的文章就介绍到这了,更多相关Java Stream API数据并行处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文