CyclicBarrier之多线程中的循环栅栏详解
作者:safe_u
1. CyclicBarrier简介
现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。
在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类。利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点(栅栏)后再进行后续的操作。下图演示了这一过程:
CyclicBarrier可以使一定数量的线程反复地在栅栏(不同轮次或不同代)位置处汇集。
- CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 和 CountDownLatch 很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。
- 当线程到达栅栏位置时将调用await()方法,这个方法将阻塞(当前线程)直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
2.CyclicBarrier的使用
2.1 常用方法
//参数parties:表示要到达屏障 (栅栏)的线程数量 //参数Runnable: 最后一个线程到达屏障之后要做的任务 //构造方法1 public CyclicBarrier(int parties) //构造方法2 public CyclicBarrier(int parties, Runnable barrierAction) //线程调用await()方法表示当前线程已经到达栅栏,然后会被阻塞 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //带时限的阻塞等待 public int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException { return dowait(true, unit.toNanos(timeout)); }
2.2 使用举例
适用场景:可用于需要多个线程均到达某一步之后才能继续往下执行的场景
//循环栅栏-可多次循环使用 CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{ System.out.println(Thread.currentThread().getName()+" 完成最后任务!"); }); IntStream.range(1,6).forEach(i->{ new Thread(()->{ try { Thread.sleep(new Double(Math.random()*3000).longValue()); System.out.println(Thread.currentThread().getName()+" 到达栅栏A"); cyclicBarrier.await();//屏障点A,当前线程会阻塞至此,等待计数器=0 System.out.println(Thread.currentThread().getName()+" 冲破栅栏A"); }catch (Exception e){ e.printStackTrace(); } }).start(); });
3.CyclicBarrier原理
CyclicBarrier是一道屏障,调用await()方法后,当前线程进入阻塞,当parties数量的线程调用await方法后,所有的await方法会返回并继续往下执行。
3.1 成员变量
/** CyclicBarrier使用的排他锁*/ private final ReentrantLock lock = new ReentrantLock(); /** barrier被冲破前,线程等待的condition*/ private final Condition trip = lock.newCondition(); /** barrier被冲破时,需要满足的参与线程数。*/ private final int parties; /* barrier被冲破后执行的方法。*/ private final Runnable barrierCommand; /** 当其轮次 */ private Generation generation = new Generation(); /** *目前等待剩余的参与者数量。从 parties倒数到0。每个轮次该值会被重置回parties */ private int count;
(1)CyclicBarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count。
- parties表示每次拦截的线程数,该值在构造时进行赋值。
- count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。
(2)CyclicBarrier有一个静态内部类Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待
(3)barrierCommand表示换代前执行的任务,当count减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barrierCommand来执行自己的任务。
3.2 构造器
//构造器1:指定本局要拦截的线程数parties 及 本局结束时要执行的任务 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //构造器2 public CyclicBarrier(int parties) { this(parties, null); }
3.3 等待的方法
CyclicBarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。源代码中await()方法最终调用的是dowait()方法:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,TimeoutException { // 获取独占锁 final ReentrantLock lock = this.lock; lock.lock();//对共享资源count,generation操作前,需先上锁保证线程安全 try { // 当前代--当前轮次对象的引用 final Generation g = generation; // 如果这轮次broken了,抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果线程中断了,抛出异常 if (Thread.interrupted()) { breakBarrier();//如果被打断,通过此方法设置当前轮次为broken状态,通知当前轮次所有等待的线程 throw new InterruptedException();//抛出异常 } //自旋前 //1、count值-1 int index = --count; // 2、判断是否到0,若是,则冲破屏障点(说明最后一个线程已经到达) if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; // 3、执行栅栏任务(若CyclicBarrier构造时传入了Runnable,则调用) if (command != null) command.run(); ranAction = true; // 4、更新一轮次,将count重置,将generation重置,唤醒之前等待的线程 nextGeneration(); return 0; } finally { // 如果执行栅栏任务(command)的时候出现了异常,那么就认为本轮次破环了 if (!ranAction) breakBarrier(); } } //计数器没有到0 =》开始自旋,直到屏障被冲破,或者interrupted或者超时 for (;;) { try { // 开始等待;如果没有时间限制,则直接等待,直到被唤醒(让其他线程进入到lock的代码块执行以上逻辑) if (!timed) trip.await();//阻塞,此时会释放锁,以让其他线程进入await方法中,等待屏障被冲破后,向后执行 // 如果有时间限制,则等待指定时间 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果当前线程阻塞被interrupt了,并且本轮次还没有被break,那么修改本轮次状态为broken if (g == generation && ! g.broken) { // 让栅栏失效 breakBarrier(); throw ie; } else { // 上面条件不满足,说明这个线程不是这轮次的,就不会影响当前这代栅栏的执行,所以,就打个中断标记 Thread.currentThread().interrupt(); } } // 当有任何一个线程中断了,就会调用breakBarrier方法,就会唤醒其他的线程,其他线程醒来后,也要抛出异常 if (g.broken) throw new BrokenBarrierException(); // g != generation表示正常换轮次了,返回当前线程所在栅栏的下标 // 如果 g == generation,说明还没有换,那为什么会醒了? // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。 // 正是因为这个原因,才需要generation来保证正确。 if (g != generation) return index; // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } }//自旋 } finally { // 释放独占锁 lock.unlock(); } }
更新本轮次的方法:nextGeneration()
private void nextGeneration() { trip.signalAll();//唤醒本轮次等待的线程 count = parties;//重置count值为初始值,为下一轮次(代)使用 generation = new Generation();//更新本轮次对象,这样自旋中的线程才会跳出自旋。 } private static class Generation { boolean broken = false; } private void breakBarrier() { generation.broken = true;//设置标识 count = parties;//重置count值为初始值 trip.signalAll();//唤醒所有等待线程 }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。