Java的并发编程之CyclicBarrier解析
作者:MC-闰土
概述
CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。
与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。
CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
使用
提供的方法:
//parties表示屏障拦截的线程数量,当屏障撤销时,先执行barrierAction,然后在释放所有线程 public CyclicBarrier(int parties, Runnable barrierAction) //barrierAction默认为null public CyclicBarrier(int parties) /* *当前线程等待直到所有线程都调用了该屏障的await()方法 *如果当前线程不是将到达的最后一个线程,将会被阻塞。解除阻塞的情况有以下几种 * 1)最后一个线程调用await() * 2)当前线程被中断 3)其他正在该CyclicBarrier上等待的线程被中断 4)其他正在该CyclicBarrier上等待的线程超时 5)其他某个线程调用该CyclicBarrier的reset()方法 *如果当前线程在进入此方法时已经设置了该线程的中断状态或者在等待时被中断,将抛出InterruptedException,并且清除当前线程的已中断状态。 *如果在线程处于等待状态时barrier被reset()或者在调用await()时 barrier 被损坏,将抛出 BrokenBarrierException 异常。 *如果任何线程在等待时被中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。 *如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作(barrierAction),那么在允许其他线程继续运行之前,当前线程将运行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。 * *返回值为当前线程的索引,0表示当前线程是最后一个到达的线程 */ public int await() throws InterruptedException, BrokenBarrierException //在await()的基础上增加超时机制,如果超出指定的等待时间,则抛出 TimeoutException 异常。如果该时间小于等于零,则此方法根本不会等待。 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个BrokenBarrierException。 public void reset()
对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
使用示例
每个Worker处理矩阵中的一行,在处理完所有的行之前,该线程将一直在屏障处等待。在各个WOrker处理完所有行后,将执行提供的Runnable屏障操作。
class Solver { final int N; //矩阵的行数 final float[][] data; //要处理的矩阵 final CyclicBarrier barrier; //循环屏障 class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); //处理指定一行数据 try { barrier.await(); //在屏障处等待直到 } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; //初始化CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() { public void run() { mergeRows(...); //合并行 } }); for (int i = 0; i < N; ++i) new Thread(new Worker(i)).start(); waitUntilDone(); } }
实现原理
基于ReentrantLock和Condition机制实现。
除了getParties()方法,CyclicBarrier的其他方法都需要获取锁。
CyclicBarrier与CountDownLatch比较
1)CountDownLatch:一个线程(或者多个),等待另外N个线程完成某个事情之后才能执行;CyclicBarrier:N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
2)CountDownLatch:一次性的;CyclicBarrier:可以重复使用。
3)CountDownLatch基于AQS;CyclicBarrier基于锁和Condition。本质上都是依赖于volatile和CAS实现的。
到此这篇关于Java的并发编程之CyclicBarrier解析的文章就介绍到这了,更多相关CyclicBarrier解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!