java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > java并行parallelStream

Java中流式并行操作parallelStream的原理和使用方法

作者:NazonaX

本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流时需要注意的线程安全问题和性能问题,并提供了最佳实践建议,感兴趣的朋友跟随小编一起看看吧

Java中流式并行操作parallelStream

0. 问题的产生

某天上线后,发现线上存在一些报错,遂即自己尝试线上操作,但是发现功能正常。追踪相关的报错代码行如下:

PointMissionPO missionPO = missionBO.getDbData();

该行报错为空指针异常,可以从代码中判断唯一能报出空指针异常的位置为missionBO为空,向上追踪该引用:

for (PointMissionBO missionBO : result) {
	// 循环体内容
}

为一个列表List的循环体。于是接着向前追踪引用,发现所有处理该列表引用的地方均使用了stream流式操作。经常使用函数式编程的同学都知道,这很少会出现null对象在列表中。我通体检查了一遍都未发现任何可能产生null对象的插入位置。很奇怪那么这个null对象是如何被加入到列表中的呢?

后来有个小伙伴经过AI上下文分析,给出了可能的位置:

missionByType.entrySet().parallelStream()
                .filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
                .map(entry -> processMissions(ctmId, hruId, entry.getValue()))
                .forEach(result::addAll);

仔细一看居然使用了parallelStream并行处理,那么什么是parallelStream呢?为什么它会出现错误呢?

1. 什么是parallelStream?

Java 8引入的Stream API提供了两种处理方式:

2. parallelStream的工作原理

parallelStream基于Fork/Join框架实现:

  1. 将数据源分割成多个子任务(fork)
  2. 在不同线程上并行处理这些子任务
  3. 合并结果(join)

3. parallelStream的正确与错误使用示例

错误使用示例(来自我们的测试代码):

List<String> result = new ArrayList<>();
missionByType.entrySet().parallelStream()
        .map(entry -> processMissions(entry.getValue()))
        .forEach(result::addAll);  // 危险操作!

问题分析:

正确使用方式一:使用同步块

List<String> result = new ArrayList<>();
missionByType.entrySet().parallelStream()
        .map(entry -> processMissions(entry.getValue()))
        .filter(processedMissions -> !processedMissions.isEmpty())
        .forEach(processedMissions -> {
            synchronized (result) {
                result.addAll(processedMissions);
            }
        });

正确使用方式二:使用收集器(推荐)

List<String> safeResult = missionByType.entrySet().parallelStream()
        .flatMap(entry -> processMissions(entry.getValue()).stream())
        .collect(Collectors.toList());

4. parallelStream在实际业务中的应用

查看我们项目中的实际应用案例:

// PointBusinessServiceImpl.java 中的实际使用
List<PointMissionBO> result = missionByType.entrySet().parallelStream()
        .filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
        .flatMap(entry -> processMissions(ctmId, hruId, entry.getValue()).stream())
        .collect(Collectors.toList());

这种方式的优点:

5. parallelStream适用场景与注意事项

适用场景:

  1. 数据量较大(通常万级以上)
  2. 计算密集型操作
  3. 无状态操作(函数式编程)
  4. 不依赖处理顺序的操作

不适用场景:

  1. 数据量小(并行开销可能超过收益)
  2. IO密集型操作
  3. 有状态共享操作
  4. 需要保证处理顺序的场景

注意事项:

  1. 线程安全:避免在并行流中使用非线程安全的对象
  2. 副作用:避免在流操作中修改外部状态
  3. 性能考量:并行不一定比串行快,需根据实际情况评估
  4. 资源竞争:注意共享资源的访问控制

6. 最佳实践建议

  1. 优先考虑collect:使用收集器而不是直接修改共享集合
  2. 避免副作用:确保流操作是无状态的纯函数
  3. 合理选择数据结构:使用适合并行处理的数据结构
  4. 测试性能:在实际环境中测试并行处理效果
  5. 监控资源使用:关注CPU和内存使用情况

以上就是关于parallelStream并行处理的AI输出内容。下面结合实际问题发生的场景进行推演和论述。

7. 其他思考

上面说了这么多,核心重点在于:parallelStream中使用线程不安全的对象操作时会出现异常。那么具体是什么样的异常呢?
其实我们仔细想想非内存安全List的内存管理,熟悉八股文的同学应该一下子就懂了,没错就是内存扩展机制。
当一个非内存安全的List在触发到内存扩展阈值的时候,就会触发一次内存扩展。具体原理就是开辟一个更长的(通常是2倍)列表,然后将原数据赋值到新列表中。这个过程中,如果出现了并行开辟的情况,那赋值的内容以及目标列表就会变得混乱,出现null对象也并非不可能。因此当尝试指定初始列表大小为256的情况下,第0章的错误就自然消失了。

那么也就出现一些关于Collection的使用建议:

  1. 初始化List时,最好能够有一个预估的列表大小并指定,其实所有Collection对象都可以这么做。
  2. 但凡出现非线程安全的Collection对象需要参与并行计算时,都需要注意它的数据正确性应当如何保证,如果无法自行控制,或者控制有难度的,可以考虑使用Concurrent包中的内容。

到此这篇关于Java中流式并行操作parallelStream的原理和使用方法的文章就介绍到这了,更多相关java并行parallelStream内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文