Java中的Phaser使用详解
作者:小柴林
这篇文章主要介绍了Java中的Phaser使用详解,与其他障碍不同,注册在phaser上进行同步的parties数量可能会随时间变化,任务可以随时进行注册,需要的朋友可以参考下
Java中的Phaser
JDK1.7新特性
可重用的同步屏障,其功能类似于CyclicBarrier和CountDownLatch,但支持更灵活的用法。
- Registration:与其他障碍不同,注册在phaser上进行同步的parties数量可能会随时间变化。任务可以随时进行注册(使用方法register,bulkRegister或建立初始方数目的构造函数形式),也可以选择在任何到达时注销(使用ArcadeAndDeregister)。与大多数基本同步结构一样,registration和deregistration会只影响内部计数。它们不会建立任何进一步的内部簿记,因此任务无法查询它们是否已注册。 (但是,可以通过通过子类来引入这种记帐方法。)
- Synchronization:像CyclicBarrier一样,Phaser可能会反复等待。arriveAndAwaitAdvance方法的作用类似于CyclicBarrier.await。每一代的phaser都有一个关联的相数。相数从零开始,并在所有各方到达移相器时被递增,在达到Integer.MAX_VALUE后回绕为零。通过使用任何注册方可以调用的两种方法,阶段编号的使用可以在到达移相器时以及在等待其他人时独立控制动作。
- Arrival:arrive和arriveAndDeregister方法记录到达。这些方法不会阻塞,但是会返回一个关联的到达phase数。也就是说,到达应用到的相位器的phase数。当给定阶段的最终参与者到达时,将执行可选操作,并且阶段将前进。这些动作由触发相位超前的一方执行,并由覆盖方法onAdvance(int,int)安排,该方法也控制终止。重写此方法类似于为CyclicBarrier提供屏障操作,但比它更具灵活性。
- Waiting:方法awaitAdvance需要一个指示到达阶段编号的参数,并在移相器前进到(或已经处于)另一个阶段时返回。与使用CyclicBarrier的类似构造不同,即使等待线程被中断,方法awaitAdvance仍继续等待。也提供中断版本和超时版本,但是任务中断或超时等待时遇到的异常不会更改相位器的状态。如有必要,通常可以在调用forceTermination之后,在这些异常的处理程序内执行任何关联的恢复。在ForkJoinPool中执行的任务也可以使用相位器,当其他任务被阻止等待阶段前进时,它将确保足够的并行度来执行任务。
- Termination:移相器可能会进入终止状态,可以使用方法isTerminated检查该状态。终止后,所有同步方法将立即返回,而无需等待提前,如负的返回值所示。同样,终止注册的尝试也无效。当onAdvance的调用返回true时,将触发终止。如果注销导致注册方的数量变为零,则默认实现返回true。如下所示,当相位器以固定的迭代次数控制动作时,通常方便的方法是在当前相位数达到阈值时覆盖此方法以终止。方法forceTermination也可用于突然释放等待线程并允许它们终止。
- Tiering:hasers可以分层(即,以树形结构构建)以减少竞争。相反,可以设置具有大量参与方的相位器,否则它们将承受沉重的同步竞争成本,以便子相位器组共享一个公共父级。即使这会招致更大的每次操作开销,也可能会大大提高吞吐量。在分层的相位器树中,将自动管理子相位器及其父级的注册和注销。只要子相位器的注册方数量不为零(在Phaser(Phaser,int)构造函数,寄存器或bulkRegister中确定),子相位器就会向其父级注册。每当调用到达和取消注册的结果而导致注册方的数量为零时,子相位器都会从其父级注销。
- Monitoring:虽然同步方法只能由注册方调用,但是相位器的当前状态可以由任何调用者监视。在任何给定时刻,总共有getRegisteredParties参与者,其中getArrivedParties已到达当前阶段(getPhase)。当其余(getUnarrivedParties)方到达时,该阶段前进。这些方法返回的值可能反映瞬态,因此通常对于同步控制没有用。方法toString以便于非正式监视的形式返回这些状态查询的快照。
用法示例
可以使用Phaser代替CountDownLatch来控制为可变方提供服务的单发操作。典型的习惯用法是将方法设置为首先注册,然后开始操作,然后注销,如下所示:
void runTasks(List<Runnable> tasks) { final Phaser phaser = new Phaser(1); // "1" to register self // create and start threads for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { phaser.arriveAndAwaitAdvance(); // await all creation task.run(); } }.start(); } // allow threads to start and deregister self phaser.arriveAndDeregister(); }
使一组线程针对给定的迭代次数重复执行操作的一种方法是重写onAdvance:
void startTasks(List<Runnable> tasks, final int iterations) { final Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; } }; phaser.register(); for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister(); // deregister self, don't wait }
如果主要任务以后必须等待终止,则它可以重新注册,然后执行类似的循环:
// ... phaser.register(); while (!phaser.isTerminated()) phaser.arriveAndAwaitAdvance();
在确保相位永远不会环绕Integer.MAX_VALUE的情况下,可以使用相关的构造来等待特定的相位编号。例如:
void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) // ... deal with unexpected termination else p = phaser.arriveAndAwaitAdvance(); } phaser.arriveAndDeregister(); }
要使用移相器树创建一组n个任务,可以使用以下形式的代码,假设Task类的构造函数接受在构造时注册的Phaser。在调用build(new Task [n],0,n,new Phaser())之后,可以启动这些任务,例如通过提交到池中:
void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(ph); // assumes new Task(ph) performs ph.register() } }
TASKS_PER_PHASER的最佳值主要取决于预期的同步速率。对于极小的每阶段任务主体(因此是高比率),低至4的值可能合适,而对于极大型的任务阶段,则最高可能为数百。实施注意事项:此实施将参与方的最大数量限制为65535。尝试注册其他参与方会导致IllegalStateException。但是,您可以并且应该创建分层的相位器,以容纳任意数量的参与者。
构造方法和公共方法
int arriveAndAwaitAdvance() //当前线程到达屏障后等待满足条件后再向下一个屏障执行,当计数不满足时线程会一直阻塞 int arriveAndDeregister() //使当前线程退出,并使parties值减1 int getPhase() //获取已满足多少次屏障 protected boolean onAdvance(int phase, int registeredParties) //通过新的屏障时被调用,在创建Phaser时子类实现逻辑,返回true使phaser失效 int getRegisteredParties() //获取注册的parties数量 int register() // 每一次调用就动态递增一个parties int bulkRegister(int parties) //多个添加parties int getArrivedParties() // 获取已被使用的parties个数 int getUnarrivedParties() // 获取未被使用的parties个数 int arrive() //使parties个数加一,并且当前线程不用等待其他线程到达屏障;当其他线程计数不足,依然会处于等待状态 int awaitAdvance(int phase) //如果入参phase值和当前getPhase()方法值一致,则等待屏障,否则忽略屏障,当前线程不可被中断 int awaitAdvanceInterruptibly(int phase) //当前线程可被中断,当前的parties不满足入参时跳过屏障 int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit) //当前线程可被中断,当前的parties不满足入参时跳过屏障或者超过指定时间 void forceTermination() //使phaser屏障功能失效 boolean isTerminated() //判断phaser对象是否呈销毁状态
到此这篇关于Java中的Phaser使用详解的文章就介绍到这了,更多相关Java中的Phaser内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!