CountDownLatch 允许一个或多个线程等待其他线程完成操作。
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。
当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。
由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。
一、应用举例
// 老板进入会议室等待5个人全部到达会议室才会开会。所以这里有两个线程老板等待开会线程、员工到达会议室: class CountDownLatchTest { private static CountDownLatch countDownLatch = new CountDownLatch(5); // Boss线程,等待员工到达开会 static class BossThread extends Thread { @Override public void run() { System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会..."); try { countDownLatch.await(); // Boss等待 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("所有人都已经到齐了,开会吧..."); } } // 员工到达会议室 static class EmpleoyeeThread extends Thread { @Override public void run() { System.out.println(Thread.currentThread().getName() + ",到达会议室...."); // 员工到达会议室 count - 1 countDownLatch.countDown(); } } public static void main(String[] args) { // Boss线程启动 new BossThread().start(); // 员工到达会议室 for (int i = 0; i < countDownLatch.getCount(); i++) { new EmpleoyeeThread().start(); } } }
二、类结构
public class CountDownLatch { private final Sync sync; // 锁 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } }
三、原理解析
CountDownLatch countDownLatch = new CountDownLatch(5);
public CountDownLatch(int count) {
if (count < 0)
throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* CountDownLatch.Sync.Sync(int)
* AQS的state用作count计数
*/
Sync(int count) {
setState(count);
}
countDownLatch.await();
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* AbstractQueuedSynchronizer.acquireSharedInterruptibly(int)
* 尝试获取锁,获取不到锁,当前进入同步队列并挂起
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 尝试获取锁
doAcquireSharedInterruptibly(arg); // 获取不到锁,当前进入同步队列并挂起
}
/**
* CountDownLatch.Sync.tryAcquireShared(int)
* state/count没有减到0之前不予许拿锁
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
countDownLatch.countDown();
public void countDown() {
sync.releaseShared(1);
}
/**
* AbstractQueuedSynchronizer.releaseShared(int)
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 尝试释放锁
doReleaseShared(); // 释放掉锁之后,唤醒同步队列的线程(调用await()的线程)
return true;
}
return false;
}
/**
* CountDownLatch.Sync.tryReleaseShared(int)
* countDown一次count/state减一
* 直到count/state减到0,return true,允许释放同步队列里的线程
*/
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
四、CyclicBarrier和CountDownLatch的区别
- CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
- CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。
继续阅读与本文标签相同的文章
-
阿里云开发者认证规则
2026-05-19栏目: 教程
-
RPA干货丨详解RPA的设计与构建
2026-05-19栏目: 教程
-
大数据学习路线分享Scala系列之数组
2026-05-19栏目: 教程
-
手把手教您将 Ghostscript 移植到函数计算平台
2026-05-19栏目: 教程
-
阿里技术专家详解 DDD 系列- Domain Primitive | 9月3号栖夜读
2026-05-19栏目: 教程
