java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java的Phaser

Java并发工具类Phaser详解

作者:啊几

这篇文章主要介绍了Java并发工具类Phaser详解,Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行,它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行,需要的朋友可以参考下

前言

Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行。

它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行。

Phaser可以被视为CyclicBarrierCountDownLatch的进化版,它能够自适应地调整并发线程数,可以动态地增加或减少参与线程的数量。

所以Phaser特别适合使用在重复执行或者重用的情况。

在这里插入图片描述

常用API

构造方法

增减参与任务数方法

到达、等待方法

Phaser使用

多线程批量处理数据

public class PhaserBatchProcessorDemo {

    private final List<String> data;
    private final int batchsize;//一次处理多少数据
    private final int threadCount;//处理的线程数
    private final Phaser phaser;
    private final List<String> processedData;

    public PhaserBatchProcessorDemo(List<String> data,int batchsize,int threadCount){
        this.data = data;
        this.batchsize = batchsize;
        this.threadCount = threadCount;
        this.phaser = new Phaser(1);
        //this.phaser = new Phaser();
        this.processedData = new ArrayList<>();
    }

    public void process() throws InterruptedException {
        for(int i = 0;i<threadCount;i++){
            System.out.println("phaser.register():"+phaser.register());
            new Thread(new BatchProcessor(i)).start();
            Thread.sleep(50);
        }
        phaser.arriveAndDeregister();//主线程执行结束
        System.out.println("结束");
    }

    private class BatchProcessor implements Runnable{
        private final int threadIndex;
        public BatchProcessor(int threadIndex){this.threadIndex = threadIndex;}

        @Override
        public void run() {
            int index = 0;
            while(true){
                //所有线程都到达这个点之前会阻塞
                System.out.println("线程"+threadIndex+"phaser.arriveAndAwaitAdvance1():");
                phaser.arriveAndAwaitAdvance();

                //从未处理数据中找到一个可以处理的批次
                List<String> batch = new ArrayList<>();
                synchronized (data){
                    while (index < data.size()&&batch.size()<batchsize){
                        String d = data.get(index);
                        if(!processedData.contains(d)){
                            batch.add(d);
                            processedData.add(d);
                        }
                        index++;
                    }
                }
                //处理数据
                for(String d:batch){
                    System.out.println("线程"+threadIndex+"处理数据"+d);
                }
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                //所有数据都处理完当前批次之前会阻塞
                System.out.println("线程"+threadIndex+"phaser.arriveAndAwaitAdvance2():");
                phaser.arriveAndAwaitAdvance();
                //所有线程都处理完当前批次并且未处理数据已经处理完之前会阻塞
                if(batch.isEmpty()||index >= data.size()){
                    System.out.println("线程"+threadIndex+"phaser.arriveAndDeregister()"+phaser.arriveAndDeregister());
                    break;
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        List<String> data = new ArrayList<>();
        for(int i = 1;i<=15;i++){
            data.add(String.valueOf(i));
        }

        PhaserBatchProcessorDemo processor = new PhaserBatchProcessorDemo(data,4,3);
        processor.process();
    }
}

/**
 * phaser.register():0
 * 线程0phaser.arriveAndAwaitAdvance1():
 * phaser.register():0
 * 线程1phaser.arriveAndAwaitAdvance1():
 * phaser.register():0
 * 线程2phaser.arriveAndAwaitAdvance1():
 * 结束
 * 线程2处理数据9
 * 线程2处理数据10
 * 线程1处理数据5
 * 线程1处理数据6
 * 线程1处理数据7
 * 线程1处理数据8
 * 线程2处理数据11
 * 线程2处理数据12
 * 线程0处理数据1
 * 线程0处理数据2
 * 线程0处理数据3
 * 线程0处理数据4
 * 线程2phaser.arriveAndAwaitAdvance2():
 * 线程0phaser.arriveAndAwaitAdvance2():
 * 线程1phaser.arriveAndAwaitAdvance2():
 * 线程2phaser.arriveAndAwaitAdvance1():
 * 线程1phaser.arriveAndAwaitAdvance1():
 * 线程0phaser.arriveAndAwaitAdvance1():
 * 线程0处理数据13
 * 线程0处理数据14
 * 线程0处理数据15
 * 线程0phaser.arriveAndAwaitAdvance2():
 * 线程1phaser.arriveAndAwaitAdvance2():
 * 线程2phaser.arriveAndAwaitAdvance2():
 * 线程1phaser.arriveAndDeregister()4
 * 线程2phaser.arriveAndDeregister()4
 * 线程0phaser.arriveAndDeregister()4
 */

这里提出一个问题:为什么要给主线程也注册呢?如果不给主线程注册会怎么样呢?

在这里插入图片描述

这里就要提及register() 增加任务数量和Phaser()初始化定义任务数量的区别

register()有一个需要注意的点是,如果主线程执行速度缓慢的话,很有可能在第一个线程已经arrive的时候,第二个线程任务还没增加,导致第一个线程因为parties只有1,而没有阻塞等待就进入下一阶段了。

如果不给主线程注册添加任务,运行结果如下

phaser.register():0
线程0phaser.arriveAndAwaitAdvance1():
线程0处理数据1
线程0处理数据2
线程0处理数据3
线程0处理数据4
phaser.register():1
线程1phaser.arriveAndAwaitAdvance1():
phaser.register():1
线程2phaser.arriveAndAwaitAdvance1():
结束
线程0phaser.arriveAndAwaitAdvance2():
线程0phaser.arriveAndAwaitAdvance1():
线程2处理数据5
线程2处理数据6
线程2处理数据7
线程2处理数据8
线程1处理数据9
线程1处理数据10
线程1处理数据11
线程1处理数据12
线程2phaser.arriveAndAwaitAdvance2():
线程1phaser.arriveAndAwaitAdvance2():
线程2phaser.arriveAndAwaitAdvance1():
线程1phaser.arriveAndAwaitAdvance1():
线程0处理数据13
线程0处理数据14
线程0处理数据15
线程0phaser.arriveAndAwaitAdvance2():
线程0phaser.arriveAndDeregister()4
线程1phaser.arriveAndAwaitAdvance2():
线程2phaser.arriveAndAwaitAdvance2():
线程2phaser.arriveAndDeregister()5
线程1phaser.arriveAndDeregister()5

而Phaser()初始化就定义了parties,会让所有线程都必须到达之前都阻塞才能进入下一阶段。

给主线程也增加一个任务的目的就在于此 如果主线程也有任务,就算主线程执行缓慢,第一个线程也必须阻塞等待主线程在第一阶段之前,把所有线程都start()启动。

阶段性任务:模拟伙伴出游

public class PhaserDemo {
    public static void main(String[] args) {
        final Phaser phaser = new Phaser(){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                //参与者数量,去除主线程
                int persons = registeredParties - 1;
                switch (phase){
                    case 0:
                        System.out.println("大家都到佘山站了,可以出发去佘山了,人数:"+persons);
                        break;
                    case 1:
                        System.out.println("大家都到佘山了,出发去爬山,人数:"+persons);
                        break;
                    case 2:
                        System.out.println("大家都到山顶了,开始休息,人数:"+persons);
                        break;

                }
                //判断是否只剩下一个主线程,如果是,返回true,代表终止
                return registeredParties ==1;
            }
        };

        phaser.register();
        final PersonTask personTask = new PersonTask();
        //3个全程参加的伙伴
        for(int i = 0;i<3;i++){

            phaser.register();
            new Thread(()->{
                try{
                    personTask.step1Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step2Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step3Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step4Task();
                    phaser.arriveAndDeregister();


                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }).start();
        }
        

        //两个在山腰半路返回
        for(int i = 0;i<2;i++){
            phaser.register();
            new Thread(()->{
                try{
                    personTask.step1Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step2Task();
                    phaser.arriveAndAwaitAdvance();

                    personTask.step3Task();
                    System.out.println("伙伴【"+Thread.currentThread().getName()+"】中途山腰返回");
                    phaser.arriveAndDeregister();

                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }).start();
        }

        while (!phaser.isTerminated()) {
            int phase = phaser.arriveAndAwaitAdvance();
            if (phase == 2) {
                //两个在佘山直接会合
                for(int i = 0;i<2;i++){
                    phaser.register();
                    new Thread(()->{
                        try{
                            System.out.println("伙伴【"+Thread.currentThread().getName()+"】中途加入");

                            personTask.step3Task();
                            phaser.arriveAndAwaitAdvance();

                            personTask.step4Task();
                            phaser.arriveAndDeregister();

                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }).start();
                }
            }
        }
    }

    static final Random random = new Random();

    static class PersonTask{
        public void step1Task() throws InterruptedException {
            String person = "伙伴【"+Thread.currentThread().getName()+"】";
            System.out.println(person+"从家出发了......");
            Thread.sleep(random.nextInt(5000));
            System.out.println(person+"到达佘山站");
        }

        public void step2Task() throws InterruptedException {
            String person = "伙伴【"+Thread.currentThread().getName()+"】";
            System.out.println(person+"出发去佘山");
            Thread.sleep(random.nextInt(5000));
            System.out.println(person+"到达佘山");
        }

        public void step3Task() throws InterruptedException {
            String person = "伙伴【"+Thread.currentThread().getName()+"】";
            System.out.println(person+"出发去爬山");
            Thread.sleep(random.nextInt(5000));
            System.out.println(person+"到达山顶");
        }

        public void step4Task() throws InterruptedException {
            String person = "伙伴【"+Thread.currentThread().getName()+"】";
            System.out.println(person+"开始休息");
            Thread.sleep(random.nextInt(5000));
            System.out.println(person+"休息结束,下山回家");
        }
    }
}

/**
 * 伙伴【Thread-3】从家出发了......
 * 伙伴【Thread-1】从家出发了......
 * 伙伴【Thread-2】从家出发了......
 * 伙伴【Thread-4】从家出发了......
 * 伙伴【Thread-0】从家出发了......
 * 伙伴【Thread-1】到达佘山站
 * 伙伴【Thread-4】到达佘山站
 * 伙伴【Thread-3】到达佘山站
 * 伙伴【Thread-0】到达佘山站
 * 伙伴【Thread-2】到达佘山站
 * 大家都到佘山站了,可以出发去佘山了,人数:5
 * 伙伴【Thread-3】出发去佘山
 * 伙伴【Thread-2】出发去佘山
 * 伙伴【Thread-4】出发去佘山
 * 伙伴【Thread-1】出发去佘山
 * 伙伴【Thread-0】出发去佘山
 * 伙伴【Thread-1】到达佘山
 * 伙伴【Thread-4】到达佘山
 * 伙伴【Thread-3】到达佘山
 * 伙伴【Thread-0】到达佘山
 * 伙伴【Thread-2】到达佘山
 * 大家都到佘山了,出发去爬山,人数:5
 * 伙伴【Thread-1】出发去爬山
 * 伙伴【Thread-4】出发去爬山
 * 伙伴【Thread-0】出发去爬山
 * 伙伴【Thread-2】出发去爬山
 * 伙伴【Thread-3】出发去爬山
 * 伙伴【Thread-6】中途加入
 * 伙伴【Thread-6】出发去爬山
 * 伙伴【Thread-5】中途加入
 * 伙伴【Thread-5】出发去爬山
 * 伙伴【Thread-2】到达山顶
 * 伙伴【Thread-5】到达山顶
 * 伙伴【Thread-0】到达山顶
 * 伙伴【Thread-4】到达山顶
 * 伙伴【Thread-4】中途山腰返回
 * 伙伴【Thread-6】到达山顶
 * 伙伴【Thread-3】到达山顶
 * 伙伴【Thread-3】中途山腰返回
 * 伙伴【Thread-1】到达山顶
 * 大家都到山顶了,开始休息,人数:5
 * 伙伴【Thread-5】开始休息
 * 伙伴【Thread-2】开始休息
 * 伙伴【Thread-6】开始休息
 * 伙伴【Thread-0】开始休息
 * 伙伴【Thread-1】开始休息
 * 伙伴【Thread-2】休息结束,下山回家
 * 伙伴【Thread-0】休息结束,下山回家
 * 伙伴【Thread-1】休息结束,下山回家
 * 伙伴【Thread-5】休息结束,下山回家
 * 伙伴【Thread-6】休息结束,下山回家
 */

应用场景总结

以下是一些常见的 Phaser 应用场景:

  1. 多线程任务分配:Phaser 可以用于将复杂的任务分配给多个线程执行,并协调线程间的合作。
  2. 多级任务流程:Phaser 可以用于实现多级任务流程,在每一级任务完成后触发下一级任务的开始。
  3. 模拟并行计算:Phaser 可以用于模拟并行计算,协调多个线程间的工作。
  4. 阶段性任务:Phaser 可以用于实现阶段性任务,在每一阶段任务完成后触发下一阶段任务的开始。

到此这篇关于Java并发工具类Phaser详解的文章就介绍到这了,更多相关Java的Phaser内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文