Java并发工具之CyclicBarrier使用详解
作者:warybee
1、CyclicBarrier简介
CyclicBarrier是一个同步器,允许一组线程相互之间等待,直到到达某个公共屏障点 (common barrier point),再继续执行。
因为CyclicBarrier 的计数器是可以循环利用的,所以称它为循环(Cyclic) 的 Barrier。
CyclicBarrier常用于多线程计算数据,当所有线程都完成执行后,在CyclicBarrier回调线程中合并计算。
2、使用介绍
CyclicBarrier默认的构造方法是CyclicBarrier(int parties)其参数表示屏障(barrier )拦截的线程数 量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
public class CyclicBarrierDemo { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } //在子线程输出1 System.out.println("1"); } }).start(); try { c.await(); //在主线程输出2 System.out.println("2"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }
因为主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行,所以既有可能先执行System.out.println("1");,也有可能先执行 System.out.println("2");。
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),则主线程和子线程会永远等待, 因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个 线程都不会继续执行。
此外CyclicBarrier还可以设置回调函数,它是一个 Runnable 实例,用于在线程到达屏障时,优先执行 Runnable 实例,可以处理更复杂的业务场景。
public class CyclicBarrierDemo { //实例化CyclicBarrier,并指定回调函数 static CyclicBarrier c = new CyclicBarrier(2,new TestRunable()); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("1"); } }).start(); try { c.await(); System.out.println("2"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } static class TestRunable implements Runnable{ @Override public void run() { System.out.println("3"); } } }
输出结果:
312
CyclicBarrier方法
- await() 该方法被调用时表示当前线程已经到达公共屏障点 (common barrier point),当前线程阻塞进入休眠状态,直到所有线程都到达屏障点,当前线程才会被唤醒。
- await(long timeout, TimeUnit unit) await()的重载方法,可以指定阻塞时长;
- getParties() 返回参与相互等待的线程数;
- isBroken() 判断此屏障是否处于中断状态。如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回true;否则返回false;
- reset() 将屏障重置为其初始状态;
- getNumberWaiting() 返回当前在屏障处等待的参与者数目,此方法主要用于调试。
3、使用案例
如果项目中有这样一个需求,计算每个客户指定年份消费总金额,然后对所有年份金额进行汇总。我们可以使用一个固定数量的线程计算每年消费总金额,将相应的结果存储在一个集合中,当所有线程完成执行其操作时,在CyclicBarrier回调线程中进行汇总以及显示。代码如下:
public class UserConsumeInfo { private String userName; private List<UserAggreInfo> userAggreInfos; //省略必要的get set 方法...... } public class UserAggreInfo { //年份 private Integer year; //消费金额 private Integer amount; public UserAggreInfo(Integer year, Integer amount) { this.year = year; this.amount = amount; } //省略必要的get set 方法...... }
public class CyclicBarrierTest { private CyclicBarrier cyclicBarrier; //保存客户消费数据 private List<UserConsumeInfo> partialResults = new CopyOnWriteArrayList<>(); private Random random = new Random(); //要统计的年份 private List<Integer> years; //统计的用户 private List<String> users; /** * 构造函数 * @param years * @param users */ public CyclicBarrierTest(List<Integer> years, List<String> users) { this.years = years; this.users = users; } /** * 开始计算用户消费信息 */ public void startCalc(){ cyclicBarrier = new CyclicBarrier(users.size(), new AggregatorThread()); for (int i = 0; i < users.size(); i++) { Thread worker = new Thread(new CalcUserAmountThread(users.get(i))); //把用户设置为当前线程名称 worker.setName(users.get(i)); worker.start(); } } /** * 计算用户消费金额线程 */ final class CalcUserAmountThread implements Runnable{ //用户名 private String userName; public CalcUserAmountThread(String userName) { this.userName = userName; } @Override public void run() { UserConsumeInfo userConsumeInfo=new UserConsumeInfo(); userConsumeInfo.setUserName(userName); //计算每年的消费金额 List<UserAggreInfo> userAggreInfos=new ArrayList<>(); for (int i = 0; i < years.size(); i++) { Integer num = random.nextInt(1000); System.out.println("用户:"+userName+" "+years.get(i)+"年,消费:"+num+"元"); userAggreInfos.add(new UserAggreInfo(years.get(i),num)); } userConsumeInfo.setUserAggreInfos(userAggreInfos); partialResults.add(userConsumeInfo); try { System.out.println("用户:"+userName + "到达barrier,等待其他用户."); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { } } } /** * 聚合线程,算出用户总的消费金额 */ class AggregatorThread implements Runnable { @Override public void run() { System.out.println("=====统计用户消费信息======="); int sum = 0;//所有用户消费金额 for (UserConsumeInfo user : partialResults) { String userName = user.getUserName(); System.out.println("用户:"+userName+"消费信息如下:"); for (UserAggreInfo aggre : user.getUserAggreInfos()) { System.out.print("\t"+aggre.getYear()+"年,"+"消费"+aggre.getAmount()+"元"+"\t"); sum+=aggre.getAmount(); } System.out.println(); } System.out.println("所有用户消费总金额:"+sum); } } }
测试:
public static void main(String[] args) { List<String> userResults=new ArrayList<>(); userResults.add("张三"); userResults.add("李四"); userResults.add("王二"); userResults.add("小米"); userResults.add("小明"); userResults.add("小红"); userResults.add("小张"); List<Integer> yearResults=new ArrayList<>(); yearResults.add(2018); yearResults.add(2019); yearResults.add(2020); CyclicBarrierTest demo = new CyclicBarrierTest(yearResults,userResults); demo.startCalc(); }
输出结果:
用户:小明 2018年,消费:310元
用户:小红 2018年,消费:589元
用户:李四 2018年,消费:557元
用户:小米 2018年,消费:946元
用户:张三 2018年,消费:150元
用户:王二 2018年,消费:17元
用户:小张 2018年,消费:228元
用户:小明 2019年,消费:29元
用户:王二 2019年,消费:741元
用户:小张 2019年,消费:380元
用户:王二 2020年,消费:650元
用户:小红 2019年,消费:412元
用户:李四 2019年,消费:453元
用户:王二到达barrier,等待其他用户.
用户:小明 2020年,消费:582元
用户:小米 2019年,消费:524元
用户:张三 2019年,消费:691元
用户:小米 2020年,消费:777元
用户:小明到达barrier,等待其他用户.
用户:李四 2020年,消费:262元
用户:小红 2020年,消费:631元
用户:小张 2020年,消费:34元
用户:小红到达barrier,等待其他用户.
用户:李四到达barrier,等待其他用户.
用户:小米到达barrier,等待其他用户.
用户:张三 2020年,消费:802元
用户:小张到达barrier,等待其他用户.
用户:张三到达barrier,等待其他用户.
=====统计用户消费信息=======
用户:王二消费信息如下:
2018年,消费17元 2019年,消费741元 2020年,消费650元 共消费:1408
用户:小明消费信息如下:
2018年,消费310元 2019年,消费29元 2020年,消费582元 共消费:921
用户:小米消费信息如下:
2018年,消费946元 2019年,消费524元 2020年,消费777元 共消费:2247
用户:李四消费信息如下:
2018年,消费557元 2019年,消费453元 2020年,消费262元 共消费:1272
用户:小红消费信息如下:
2018年,消费589元 2019年,消费412元 2020年,消费631元 共消费:1632
用户:小张消费信息如下:
2018年,消费228元 2019年,消费380元 2020年,消费34元 共消费:642
用户:张三消费信息如下:
2018年,消费150元 2019年,消费691元 2020年,消费802元 共消费:1643
所有用户消费总金额:9765
4、CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重 置。
所以CyclicBarrier能处理更为复杂的业务场景。
例如,如果计算发生错误,可以重置计数 器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier 阻塞的线程数量。
isBroken()方法用来了解阻塞的线程是否被中断
到此这篇关于Java并发工具之CyclicBarrier使用详解的文章就介绍到这了,更多相关CyclicBarrier使用详解内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!