Jdk1.6 JUC源码解析(11)-CyclicBarrier
作者:大飞
功能简介:
- CyclicBarrier是一种可重复使用的栅栏机制,可以让一组线程在某个点上相互等待,这个点就可以类比为栅栏。并且这个栅栏是可重复使用的,这点可以和前面分析过的CountDownLatch做对比,CountDownLatch只能用一次。
- CyclicBarrier还支持在所有线程到达栅栏之后,在所有线程从等待状态转到可运行状态之前,执行一个命令(或者说是动作)。
- 当然,在某些情况下,栅栏可以被打破。比如某个线程无法在规定的时间内到达栅栏。
源码分析:
- 先看下CyclicBarrier内部的结构:
public class CyclicBarrier { /** * 每次对栅栏的使用可以表示为一个generation。栅栏每次开放或者重置, * generation都会发生改变。使用栅栏的线程可以关联多个generations, * 由于等待线程可能会以多种方式请求锁,但是在特定的时间只有一个是 * 可用的,其他的要么被打破,要么开放。 * 如果一个栅栏已经被打破。且没有后续的重置动作,那么可以不存在可 * 用的generation。 */ private static class Generation { boolean broken = false; } /** 用于保护栅栏的锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 栅栏开放的条件 */ private final Condition trip = lock.newCondition(); /** 表示当前使用栅栏的使用方(线程)数量 */ private final int parties; /* 当栅栏开放时,要使用的命令(动作) */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * 处于等待状态的使用方(线程)的数量,在每一个generation上从 * parties递减为0。当新建generation(栅栏开放)或者栅栏被打破 * 时,重置为parties。 */ private int count;
从上述的结构细节中可见,CyclicBarrier内部使用ReentrantLock来实现,并包含一个trip条件,来作为栅栏模拟栅栏的行为(所有使用方都在这个条件上等待)。
- 具体使用栅栏时,各个线程会在要互相等待的地方调用一个"等待"方法,然后在这个方法处等待。当所有线程都到达次方法时,栅栏打开,所有线程从等待出继续执行。接下来就从这个"等待"方法入手开始分析:
/** * 线程调用此方法后等待,直到所有parties都调用当前barrier的这个方法。 * * 如果当前线程是不最后一个到达此方法的线程,那么会阻塞,直到下面的 * 事情发生: * * 最后的线程也到达此方法。 * 其他线程中断了当前线程。 * 其他线程中断了在栅栏处等待的某个线程。 * 某个线程调用了栅栏的reset方法。 */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } /** * 和上面的方法相比,多了超时的情况。 */ public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
看下上述等待方法中调用的dowait方法:
/** * 栅栏主体代码,涵盖了所有情况。 */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { //获取当前的generation实例。 final Generation g = generation; if (g.broken) //如果当前generation状态为broken,说明栅栏被打破,抛出BrokenBarrierException异常。 throw new BrokenBarrierException(); if (Thread.interrupted()) { //如果当前线程被中断,打破栅栏,然后抛出中断异常。 breakBarrier(); throw new InterruptedException(); } //计算当前到达线程的下标。 int index = --count; //下标为0表示当前线程为最后一个使用栅栏的线程。 if (index == 0) { // 栅栏开放。 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null)//如果有栅栏命令,执行栅栏命令。 command.run();//看来栅栏的命令是由最后一个到达栅栏的线程执行。 ranAction = true; //产生新的generation。 nextGeneration(); return 0; } finally { if (!ranAction) //如果栅栏命令未执行,打破栅栏。 breakBarrier(); } } // 等待中的主循环,直到栅栏开放、栅栏被打破、线程被打断或者超时时退出。 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { //如果出于当前generation 且generation状态为未打破,那么打破栅栏。 breakBarrier(); throw ie; } else { // 如果没被中断的话,我们即将完成等待。 // 所以这个中断被算作下一次执行的中断。 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation)//如果generation改变了,说明之前的栅栏已经开放,返回index。 return index; if (timed && nanos <= 0L) { breakBarrier();//如果超时,打破栅栏,并返回超时异常。 throw new TimeoutException(); } } } finally { lock.unlock(); } }
从dowait方法中可以看到,当所有使用者都到达时,栅栏开放,会调用nextGeneration方法;如果有其他情况(超时、中断等)发生,会调用breakBarrier方法。先看下nextGeneration:
/** * 更新栅栏状态,唤醒所有在栅栏处等待的线程。 * 这个方法只有在持有锁的情况下被调用。 */ private void nextGeneration() { //唤醒所有在栅栏处等待的线程。 trip.signalAll(); //重置count。 count = parties; //产生新的generation。 generation = new Generation(); }
再看下breakBarrier方法:
/** * 设置当前栅栏generation状态为打破状态,并唤醒栅栏处的等待线程。 * 这个方法只有在持有锁的情况下被调用。 */ private void breakBarrier() { //设置"打破"状态。 generation.broken = true; //重置count。 count = parties; //唤醒所有在栅栏处等待的线程。 trip.signalAll(); }
dowait内部为栅栏的主要逻辑,经过上面的分析,应该很清晰了。最后看下其他的方法:
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); } public int getParties() { return parties; } public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //重置逻辑,先打破当前的栅栏,然后建立一个新的。 breakBarrier(); nextGeneration(); } finally { lock.unlock(); } } /** * 获取在栅栏处等待的使用方(线程)数量。 */ public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
小总结一下:
1.当建立一个使用方数量为n的栅栏时,栅栏内部有一个为n的计数。当使用方调用await方法时,如果其他n-1个使用方没有全部到达await方法(内部计数减1后,不等于0),那么使用方(线程)阻塞等待。
2.当第n个使用方调用await时,栅栏开放(内部计数减1后等于0),会唤醒所有在await方法上等待着的使用方(线程),大家一起通过栅栏,然后重置栅栏(内部计数又变成n),栅栏变成新建后的状态,可以再次使用。
CyclicBarrier的代码解析完毕!
相关推荐
aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...
JDK 1.6 64位(jdk-6u45-windows-x64(1.6 64))
2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar
适用平台:windows x64 jdk版本:1.6 安装方式:双击安装即可
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
jdk-jdk1.6.0.24-windows-i586.exe
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
logback-cfca-jdk1.6-3.1.0.0.jar
jdk1.6 源码
文档介绍了JDK1.6安装及与JDK-1.5版本共存,已经在我的机子上通过了测试,按照文档操作即可。
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
JDK-1.6-Windows-32位 纯官方安装版 JDK-1.6-Windows-32位 纯官方安装版
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
Java编程开发工具包,最新版本,很好用,经典
1、okhttp3.8源码使用jdk1.6重新编译,已集成了okio,仅在javaweb项目中使用。 2、另附json-20160810.jar