Java并发工具类Phaser详解
作者:啊几
前言
Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行。
它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行。
Phaser可以被视为CyclicBarrier和CountDownLatch的进化版,它能够自适应地调整并发线程数,可以动态地增加或减少参与线程的数量。
所以Phaser特别适合使用在重复执行或者重用的情况。
常用API
构造方法
- Phaser(): 参与任务数0
- Phaser(int parties) :指定初始参与任务数
- Phaser(Phaser parent) :指定parent阶段器, 子对象作为一个整体加入parent对象, 当子对象中没有参与者时,会自动从parent对象解除注册
- Phaser(Phaser parent,int parties) : 集合上面两个方法
增减参与任务数方法
- int register() 增加一个任务数,返回当前阶段号。
- int bulkRegister(int parties) 增加指定任务个数,返回当前阶段号。
- int arriveAndDeregister() 减少一个任务数,返回当前阶段号。
到达、等待方法
- int arrive() 到达(任务完成),返回当前阶段号。
- int arriveAndAwaitAdvance() 到达后等待其他任务到达,返回到达阶段号。
- int awaitAdvance(int phase) 在指定阶段等待(必须是当前阶段才有效)
- int awaitAdvanceInterruptibly(int phase) 阶段到达触发动作
- int awaitAdvanceInterruptiBly(int phase,long timeout,TimeUnit unit)
- protected boolean onAdvance(int phase,int registeredParties)类似CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作,该方法返回true将终结Phaser对象。
Phaser使用
多线程批量处理数据
public class PhaserBatchProcessorDemo { private final List<String> data; private final int batchsize;//一次处理多少数据 private final int threadCount;//处理的线程数 private final Phaser phaser; private final List<String> processedData; public PhaserBatchProcessorDemo(List<String> data,int batchsize,int threadCount){ this.data = data; this.batchsize = batchsize; this.threadCount = threadCount; this.phaser = new Phaser(1); //this.phaser = new Phaser(); this.processedData = new ArrayList<>(); } public void process() throws InterruptedException { for(int i = 0;i<threadCount;i++){ System.out.println("phaser.register():"+phaser.register()); new Thread(new BatchProcessor(i)).start(); Thread.sleep(50); } phaser.arriveAndDeregister();//主线程执行结束 System.out.println("结束"); } private class BatchProcessor implements Runnable{ private final int threadIndex; public BatchProcessor(int threadIndex){this.threadIndex = threadIndex;} @Override public void run() { int index = 0; while(true){ //所有线程都到达这个点之前会阻塞 System.out.println("线程"+threadIndex+"phaser.arriveAndAwaitAdvance1():"); phaser.arriveAndAwaitAdvance(); //从未处理数据中找到一个可以处理的批次 List<String> batch = new ArrayList<>(); synchronized (data){ while (index < data.size()&&batch.size()<batchsize){ String d = data.get(index); if(!processedData.contains(d)){ batch.add(d); processedData.add(d); } index++; } } //处理数据 for(String d:batch){ System.out.println("线程"+threadIndex+"处理数据"+d); } try { Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } //所有数据都处理完当前批次之前会阻塞 System.out.println("线程"+threadIndex+"phaser.arriveAndAwaitAdvance2():"); phaser.arriveAndAwaitAdvance(); //所有线程都处理完当前批次并且未处理数据已经处理完之前会阻塞 if(batch.isEmpty()||index >= data.size()){ System.out.println("线程"+threadIndex+"phaser.arriveAndDeregister()"+phaser.arriveAndDeregister()); break; } } } } public static void main(String[] args) throws InterruptedException { List<String> data = new ArrayList<>(); for(int i = 1;i<=15;i++){ data.add(String.valueOf(i)); } PhaserBatchProcessorDemo processor = new PhaserBatchProcessorDemo(data,4,3); processor.process(); } } /** * phaser.register():0 * 线程0phaser.arriveAndAwaitAdvance1(): * phaser.register():0 * 线程1phaser.arriveAndAwaitAdvance1(): * phaser.register():0 * 线程2phaser.arriveAndAwaitAdvance1(): * 结束 * 线程2处理数据9 * 线程2处理数据10 * 线程1处理数据5 * 线程1处理数据6 * 线程1处理数据7 * 线程1处理数据8 * 线程2处理数据11 * 线程2处理数据12 * 线程0处理数据1 * 线程0处理数据2 * 线程0处理数据3 * 线程0处理数据4 * 线程2phaser.arriveAndAwaitAdvance2(): * 线程0phaser.arriveAndAwaitAdvance2(): * 线程1phaser.arriveAndAwaitAdvance2(): * 线程2phaser.arriveAndAwaitAdvance1(): * 线程1phaser.arriveAndAwaitAdvance1(): * 线程0phaser.arriveAndAwaitAdvance1(): * 线程0处理数据13 * 线程0处理数据14 * 线程0处理数据15 * 线程0phaser.arriveAndAwaitAdvance2(): * 线程1phaser.arriveAndAwaitAdvance2(): * 线程2phaser.arriveAndAwaitAdvance2(): * 线程1phaser.arriveAndDeregister()4 * 线程2phaser.arriveAndDeregister()4 * 线程0phaser.arriveAndDeregister()4 */
这里提出一个问题:为什么要给主线程也注册呢?如果不给主线程注册会怎么样呢?
这里就要提及register() 增加任务数量和Phaser()初始化定义任务数量的区别:
register()有一个需要注意的点是,如果主线程执行速度缓慢的话,很有可能在第一个线程已经arrive的时候,第二个线程任务还没增加,导致第一个线程因为parties只有1,而没有阻塞等待就进入下一阶段了。
如果不给主线程注册添加任务,运行结果如下
phaser.register():0
线程0phaser.arriveAndAwaitAdvance1():
线程0处理数据1
线程0处理数据2
线程0处理数据3
线程0处理数据4
phaser.register():1
线程1phaser.arriveAndAwaitAdvance1():
phaser.register():1
线程2phaser.arriveAndAwaitAdvance1():
结束
线程0phaser.arriveAndAwaitAdvance2():
线程0phaser.arriveAndAwaitAdvance1():
线程2处理数据5
线程2处理数据6
线程2处理数据7
线程2处理数据8
线程1处理数据9
线程1处理数据10
线程1处理数据11
线程1处理数据12
线程2phaser.arriveAndAwaitAdvance2():
线程1phaser.arriveAndAwaitAdvance2():
线程2phaser.arriveAndAwaitAdvance1():
线程1phaser.arriveAndAwaitAdvance1():
线程0处理数据13
线程0处理数据14
线程0处理数据15
线程0phaser.arriveAndAwaitAdvance2():
线程0phaser.arriveAndDeregister()4
线程1phaser.arriveAndAwaitAdvance2():
线程2phaser.arriveAndAwaitAdvance2():
线程2phaser.arriveAndDeregister()5
线程1phaser.arriveAndDeregister()5
而Phaser()初始化就定义了parties,会让所有线程都必须到达之前都阻塞才能进入下一阶段。
给主线程也增加一个任务的目的就在于此 如果主线程也有任务,就算主线程执行缓慢,第一个线程也必须阻塞等待主线程在第一阶段之前,把所有线程都start()启动。
阶段性任务:模拟伙伴出游
public class PhaserDemo { public static void main(String[] args) { final Phaser phaser = new Phaser(){ @Override protected boolean onAdvance(int phase, int registeredParties) { //参与者数量,去除主线程 int persons = registeredParties - 1; switch (phase){ case 0: System.out.println("大家都到佘山站了,可以出发去佘山了,人数:"+persons); break; case 1: System.out.println("大家都到佘山了,出发去爬山,人数:"+persons); break; case 2: System.out.println("大家都到山顶了,开始休息,人数:"+persons); break; } //判断是否只剩下一个主线程,如果是,返回true,代表终止 return registeredParties ==1; } }; phaser.register(); final PersonTask personTask = new PersonTask(); //3个全程参加的伙伴 for(int i = 0;i<3;i++){ phaser.register(); new Thread(()->{ try{ personTask.step1Task(); phaser.arriveAndAwaitAdvance(); personTask.step2Task(); phaser.arriveAndAwaitAdvance(); personTask.step3Task(); phaser.arriveAndAwaitAdvance(); personTask.step4Task(); phaser.arriveAndDeregister(); }catch (InterruptedException e){ e.printStackTrace(); } }).start(); } //两个在山腰半路返回 for(int i = 0;i<2;i++){ phaser.register(); new Thread(()->{ try{ personTask.step1Task(); phaser.arriveAndAwaitAdvance(); personTask.step2Task(); phaser.arriveAndAwaitAdvance(); personTask.step3Task(); System.out.println("伙伴【"+Thread.currentThread().getName()+"】中途山腰返回"); phaser.arriveAndDeregister(); }catch (InterruptedException e){ e.printStackTrace(); } }).start(); } while (!phaser.isTerminated()) { int phase = phaser.arriveAndAwaitAdvance(); if (phase == 2) { //两个在佘山直接会合 for(int i = 0;i<2;i++){ phaser.register(); new Thread(()->{ try{ System.out.println("伙伴【"+Thread.currentThread().getName()+"】中途加入"); personTask.step3Task(); phaser.arriveAndAwaitAdvance(); personTask.step4Task(); phaser.arriveAndDeregister(); }catch (InterruptedException e){ e.printStackTrace(); } }).start(); } } } } static final Random random = new Random(); static class PersonTask{ public void step1Task() throws InterruptedException { String person = "伙伴【"+Thread.currentThread().getName()+"】"; System.out.println(person+"从家出发了......"); Thread.sleep(random.nextInt(5000)); System.out.println(person+"到达佘山站"); } public void step2Task() throws InterruptedException { String person = "伙伴【"+Thread.currentThread().getName()+"】"; System.out.println(person+"出发去佘山"); Thread.sleep(random.nextInt(5000)); System.out.println(person+"到达佘山"); } public void step3Task() throws InterruptedException { String person = "伙伴【"+Thread.currentThread().getName()+"】"; System.out.println(person+"出发去爬山"); Thread.sleep(random.nextInt(5000)); System.out.println(person+"到达山顶"); } public void step4Task() throws InterruptedException { String person = "伙伴【"+Thread.currentThread().getName()+"】"; System.out.println(person+"开始休息"); Thread.sleep(random.nextInt(5000)); System.out.println(person+"休息结束,下山回家"); } } } /** * 伙伴【Thread-3】从家出发了...... * 伙伴【Thread-1】从家出发了...... * 伙伴【Thread-2】从家出发了...... * 伙伴【Thread-4】从家出发了...... * 伙伴【Thread-0】从家出发了...... * 伙伴【Thread-1】到达佘山站 * 伙伴【Thread-4】到达佘山站 * 伙伴【Thread-3】到达佘山站 * 伙伴【Thread-0】到达佘山站 * 伙伴【Thread-2】到达佘山站 * 大家都到佘山站了,可以出发去佘山了,人数:5 * 伙伴【Thread-3】出发去佘山 * 伙伴【Thread-2】出发去佘山 * 伙伴【Thread-4】出发去佘山 * 伙伴【Thread-1】出发去佘山 * 伙伴【Thread-0】出发去佘山 * 伙伴【Thread-1】到达佘山 * 伙伴【Thread-4】到达佘山 * 伙伴【Thread-3】到达佘山 * 伙伴【Thread-0】到达佘山 * 伙伴【Thread-2】到达佘山 * 大家都到佘山了,出发去爬山,人数:5 * 伙伴【Thread-1】出发去爬山 * 伙伴【Thread-4】出发去爬山 * 伙伴【Thread-0】出发去爬山 * 伙伴【Thread-2】出发去爬山 * 伙伴【Thread-3】出发去爬山 * 伙伴【Thread-6】中途加入 * 伙伴【Thread-6】出发去爬山 * 伙伴【Thread-5】中途加入 * 伙伴【Thread-5】出发去爬山 * 伙伴【Thread-2】到达山顶 * 伙伴【Thread-5】到达山顶 * 伙伴【Thread-0】到达山顶 * 伙伴【Thread-4】到达山顶 * 伙伴【Thread-4】中途山腰返回 * 伙伴【Thread-6】到达山顶 * 伙伴【Thread-3】到达山顶 * 伙伴【Thread-3】中途山腰返回 * 伙伴【Thread-1】到达山顶 * 大家都到山顶了,开始休息,人数:5 * 伙伴【Thread-5】开始休息 * 伙伴【Thread-2】开始休息 * 伙伴【Thread-6】开始休息 * 伙伴【Thread-0】开始休息 * 伙伴【Thread-1】开始休息 * 伙伴【Thread-2】休息结束,下山回家 * 伙伴【Thread-0】休息结束,下山回家 * 伙伴【Thread-1】休息结束,下山回家 * 伙伴【Thread-5】休息结束,下山回家 * 伙伴【Thread-6】休息结束,下山回家 */
应用场景总结
以下是一些常见的 Phaser 应用场景:
- 多线程任务分配:Phaser 可以用于将复杂的任务分配给多个线程执行,并协调线程间的合作。
- 多级任务流程:Phaser 可以用于实现多级任务流程,在每一级任务完成后触发下一级任务的开始。
- 模拟并行计算:Phaser 可以用于模拟并行计算,协调多个线程间的工作。
- 阶段性任务:Phaser 可以用于实现阶段性任务,在每一阶段任务完成后触发下一阶段任务的开始。
到此这篇关于Java并发工具类Phaser详解的文章就介绍到这了,更多相关Java的Phaser内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!