java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java并行流(Parallel Stream)

Java之并行流(Parallel Stream)使用详解

作者:wujiada001

Java并行流(ParallelStream)通过多线程并行处理集合数据,利用Fork/Join框架加速计算,适用于大规模数据集和计算密集型任务,并行流主要通过集合的parallelStream()方法或现有流的parallel()方法创建,适用于数据量大、计算复杂且无状态操作的场景

Java并行流(Parallel Stream)

并行流是Java 8引入的高效处理集合数据的工具,通过多线程加速计算。

以下是其核心概念、使用方法及注意事项的详细指南:

1. 核心概念与原理

2. 创建并行流的方式

List<Integer> list = Arrays.asList(1, 2, 3, 4);

// 方式1:直接生成并行流
Stream<Integer> parallelStream1 = list.parallelStream();

// 方式2:将顺序流转为并行
Stream<Integer> parallelStream2 = list.stream().parallel();

3. 适用场景与性能优化

推荐场景

性能陷阱

4. 注意事项与最佳实践

避免共享可变状态

并行操作中修改共享变量会导致线程安全问题,应使用无状态操作或同步控制。

// 错误示例:线程不安全的累加
List<Integer> nums = Arrays.asList(1, 2, 3);
int[] sum = {0};
nums.parallelStream().forEach(n -> sum += n); // 结果可能错误

// 正确做法:使用归约
int safeSum = nums.parallelStream().reduce(0, Integer::sum);

谨慎使用有状态操作

sorted()distinct()在并行流中可能更耗时,需合并线程结果。

// 并行排序(可能比顺序流慢)
List<Integer> sortedList = nums.parallelStream().sorted().toList();

数据源的可拆分性

顺序敏感操作

使用forEachOrdered保证顺序,但牺牲性能。

// 按顺序输出(性能低于无序操作)
list.parallelStream().forEachOrdered(System.out::println);

配置线程池

默认线程数:

Runtime.getRuntime().availableProcessors()

修改全局线程数:

# JVM启动参数
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8

5. 性能对比示例

// 顺序流 vs 并行流(处理1000万数据)
List<Long> numbers = LongStream.rangeClosed(1, 10_000_000)
                               .boxed().collect(Collectors.toList());

// 顺序流耗时
long start = System.currentTimeMillis();
long seqSum = numbers.stream().mapToLong(n -> n * 2).sum();
System.out.println("顺序流耗时: " + (System.currentTimeMillis() - start) + "ms");

// 并行流耗时
start = System.currentTimeMillis();
long parSum = numbers.parallelStream().mapToLong(n -> n * 2).sum();
System.out.println("并行流耗时: " + (System.currentTimeMillis() - start) + "ms");

典型结果(8核CPU):

顺序流耗时: 120ms 并行流耗时: 35ms

总结

优势:简化多线程编程,提升大数据处理效率。

局限:不适合小数据量、顺序敏感或低计算量任务。

最佳实践

通过合理使用并行流,可在不增加复杂代码的情况下显著提升程序性能,但需结合场景权衡利弊。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文