java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java中的Phaser

Java中的Phaser使用详解

作者:小柴林

这篇文章主要介绍了Java中的Phaser使用详解,与其他障碍不同,注册在phaser上进行同步的parties数量可能会随时间变化,任务可以随时进行注册,需要的朋友可以参考下

Java中的Phaser

JDK1.7新特性

可重用的同步屏障,其功能类似于CyclicBarrier和CountDownLatch,但支持更灵活的用法。

用法示例

可以使用Phaser代替CountDownLatch来控制为可变方提供服务的单发操作。典型的习惯用法是将方法设置为首先注册,然后开始操作,然后注销,如下所示:

 void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         phaser.arriveAndAwaitAdvance(); // await all creation
         task.run();
       }
     }.start();
   }
 
   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 }

使一组线程针对给定的迭代次数重复执行操作的一种方法是重写onAdvance:

void startTasks(List<Runnable> tasks, final int iterations) {
   final Phaser phaser = new Phaser() {
     protected boolean onAdvance(int phase, int registeredParties) {
       return phase >= iterations || registeredParties == 0;
     }
   };
   phaser.register();
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         do {
           task.run();
           phaser.arriveAndAwaitAdvance();
         } while (!phaser.isTerminated());
       }
     }.start();
   }
   phaser.arriveAndDeregister(); // deregister self, don't wait
 }

如果主要任务以后必须等待终止,则它可以重新注册,然后执行类似的循环:

// ...
phaser.register();
while (!phaser.isTerminated())
  phaser.arriveAndAwaitAdvance();

在确保相位永远不会环绕Integer.MAX_VALUE的情况下,可以使用相关的构造来等待特定的相位编号。例如:

 void awaitPhase(Phaser phaser, int phase) {
   int p = phaser.register(); // assumes caller not already registered
   while (p < phase) {
     if (phaser.isTerminated())
       // ... deal with unexpected termination
     else
       p = phaser.arriveAndAwaitAdvance();
   }
   phaser.arriveAndDeregister();
 }

要使用移相器树创建一组n个任务,可以使用以下形式的代码,假设Task类的构造函数接受在构造时注册的Phaser。在调用build(new Task [n],0,n,new Phaser())之后,可以启动这些任务,例如通过提交到池中:

 void build(Task[] tasks, int lo, int hi, Phaser ph) {
   if (hi - lo > TASKS_PER_PHASER) {
     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
       int j = Math.min(i + TASKS_PER_PHASER, hi);
       build(tasks, i, j, new Phaser(ph));
     }
   } else {
     for (int i = lo; i < hi; ++i)
       tasks[i] = new Task(ph);
       // assumes new Task(ph) performs ph.register()
   }
 }

TASKS_PER_PHASER的最佳值主要取决于预期的同步速率。对于极小的每阶段任务主体(因此是高比率),低至4的值可能合适,而对于极大型的任务阶段,则最高可能为数百。实施注意事项:此实施将参与方的最大数量限制为65535。尝试注册其他参与方会导致IllegalStateException。但是,您可以并且应该创建分层的相位器,以容纳任意数量的参与者。

构造方法和公共方法

int arriveAndAwaitAdvance()     //当前线程到达屏障后等待满足条件后再向下一个屏障执行,当计数不满足时线程会一直阻塞
int arriveAndDeregister()       //使当前线程退出,并使parties值减1
int getPhase()                  //获取已满足多少次屏障
protected boolean onAdvance(int phase, int registeredParties)  //通过新的屏障时被调用,在创建Phaser时子类实现逻辑,返回true使phaser失效
int getRegisteredParties()      //获取注册的parties数量
int register()                 // 每一次调用就动态递增一个parties
int bulkRegister(int parties)   //多个添加parties
int getArrivedParties()        // 获取已被使用的parties个数
 int getUnarrivedParties()     // 获取未被使用的parties个数
 
int arrive()                    //使parties个数加一,并且当前线程不用等待其他线程到达屏障;当其他线程计数不足,依然会处于等待状态
int awaitAdvance(int phase)     //如果入参phase值和当前getPhase()方法值一致,则等待屏障,否则忽略屏障,当前线程不可被中断
int awaitAdvanceInterruptibly(int phase) //当前线程可被中断,当前的parties不满足入参时跳过屏障
int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit) //当前线程可被中断,当前的parties不满足入参时跳过屏障或者超过指定时间
void forceTermination()         //使phaser屏障功能失效
 boolean isTerminated()         //判断phaser对象是否呈销毁状态  

到此这篇关于Java中的Phaser使用详解的文章就介绍到这了,更多相关Java中的Phaser内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文