抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

AQS(AbstractQueuedSynchronizer)是Java并发包的核心基础类,ReentrantLock、Semaphore、CountDownLatch等同步工具都基于它实现。本文深入剖析AQS的设计原理和实现细节。

AQS是什么

AQS(抽象队列同步器)是一个用于构建锁和同步器的框架。它提供了:

  1. 同步状态管理(state变量)
  2. 线程排队机制(CLH队列)
  3. 线程阻塞/唤醒机制(LockSupport)
1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────────────────────────┐
│ JUC同步器 │
├──────────────┬──────────────┬──────────────┬────────────────┤
│ ReentrantLock│ Semaphore │CountDownLatch│ ReadWriteLock │
├──────────────┴──────────────┴──────────────┴────────────────┤
│ AQS │
│ AbstractQueuedSynchronizer │
├─────────────────────────────────────────────────────────────┤
│ LockSupport │
│ (park / unpark) │
└─────────────────────────────────────────────────────────────┘

核心数据结构

state状态变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public abstract class AbstractQueuedSynchronizer {
// 同步状态,不同同步器有不同含义
private volatile int state;

protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

// CAS修改state
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
}

state在不同同步器中的含义:

同步器 state含义
ReentrantLock 0=未锁定,>0=锁定次数(重入)
Semaphore 可用许可数量
CountDownLatch 剩余计数
ReentrantReadWriteLock 高16位=读锁数,低16位=写锁数

CLH队列

AQS使用变种的CLH队列管理等待线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────────────────────────────┐
│ CLH队列(双向链表) │
│ │
│ head tail │
│ │ │ │
│ ▼ ▼ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ Node │◀───▶│ Node │◀───▶│ Node │◀───▶│ Node │ │
│ │(哨兵)│ │ T1 │ │ T2 │ │ T3 │ │
│ │ │ │SIGNAL│ │SIGNAL│ │ 0 │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ WAITING WAITING WAITING │
│ │
└─────────────────────────────────────────────────────────────────┘

Node节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static final class Node {
// 节点状态
volatile int waitStatus;

// 前驱节点
volatile Node prev;

// 后继节点
volatile Node next;

// 节点关联的线程
volatile Thread thread;

// 条件队列中的下一个节点(Condition使用)
Node nextWaiter;

// 共享模式标记
static final Node SHARED = new Node();

// 独占模式标记
static final Node EXCLUSIVE = null;

// waitStatus常量
static final int CANCELLED = 1; // 线程已取消
static final int SIGNAL = -1; // 后继节点需要被唤醒
static final int CONDITION = -2; // 在条件队列中等待
static final int PROPAGATE = -3; // 共享模式下传播唤醒
}

waitStatus状态

1
2
3
4
5
6
7
8
9
10
11
12
waitStatus状态转换:

0 (初始) ──▶ SIGNAL (-1) 正常等待,需要唤醒后继

├──▶ CANCELLED (1) 线程取消/超时/中断

└──▶ CONDITION (-2) 在条件队列中(Condition.await)

SIGNAL: 当前节点释放锁时需要unpark后继节点
CANCELLED: 节点已取消,会被清理出队列
CONDITION: 节点在条件队列中,等待signal转移到同步队列
PROPAGATE: 共享模式下,释放操作需要传播给后续节点

两种模式

独占模式(Exclusive)

同一时刻只有一个线程能获取同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 独占获取
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 独占释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

使用独占模式的同步器:

  • ReentrantLock
  • ReentrantReadWriteLock的写锁

共享模式(Shared)

多个线程可以同时获取同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 共享获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

// 共享释放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

使用共享模式的同步器:

  • Semaphore
  • CountDownLatch
  • ReentrantReadWriteLock的读锁

独占模式源码分析

acquire获取锁

1
2
3
4
5
6
7
8
public final void acquire(int arg) {
// 1. tryAcquire尝试获取锁(子类实现)
// 2. 失败则addWaiter加入队列
// 3. acquireQueued自旋获取锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

addWaiter入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addWaiter(Node mode) {
Node node = new Node(mode);

for (;;) {
Node oldTail = tail;
if (oldTail != null) {
// 设置前驱
node.setPrevRelaxed(oldTail);
// CAS设置尾节点
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// 队列为空,初始化
initializeSyncQueue();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
addWaiter过程:

初始状态(队列为空):
head = null, tail = null

第一个线程入队:
1. 初始化,创建哨兵节点
head ──▶ [哨兵] ◀── tail

2. 新节点加入
head ──▶ [哨兵] ◀──▶ [Node T1] ◀── tail

后续线程入队:
head ──▶ [哨兵] ◀──▶ [T1] ◀──▶ [T2] ◀──▶ [T3] ◀── tail

acquireQueued自旋获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();

// 如果前驱是head,尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node); // 成功,设为新head
p.next = null; // help GC
return interrupted;
}

// 检查是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

shouldParkAfterFailedAcquire判断是否阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;

if (ws == Node.SIGNAL)
// 前驱状态是SIGNAL,可以安全阻塞
return true;

if (ws > 0) {
// 前驱已取消,跳过取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前驱状态设为SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
为什么需要SIGNAL状态?

线程阻塞前必须确保前驱会唤醒自己:
1. 将前驱状态设为SIGNAL
2. 前驱释放锁时检查状态,发现SIGNAL则唤醒后继

避免信号丢失:
T1持有锁 ──▶ [head] ◀──▶ [T2 SIGNAL]

T1释放锁时:
1. 检查head.waitStatus == SIGNAL
2. 调用unparkSuccessor(head)唤醒T2

release释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final boolean release(int arg) {
// tryRelease由子类实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒后继
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);

// 找到需要唤醒的后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部向前找有效节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒
}

完整流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
acquire流程:

tryAcquire()

┌────┴────┐
成功 失败
│ │
▼ ▼
获得锁 addWaiter()


acquireQueued()

┌────┴────┐
前驱是head 否
│ │
▼ ▼
tryAcquire shouldPark?
│ │
┌────┴───┐ │
成功 失败 ▼
│ └──▶ park()
▼ │
设为head unpark后继续
│ 自旋尝试

返回


release流程:

tryRelease()

┌────┴────┐
成功 失败
│ │
▼ ▼
head.ws != 0? 返回false

┌────┴────┐
是 否
│ │
▼ ▼
unparkSuccessor 返回true


返回true

共享模式源码分析

acquireShared获取共享锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final void acquireShared(int arg) {
// tryAcquireShared返回值:
// < 0: 获取失败
// = 0: 获取成功,但后续共享获取不会成功
// > 0: 获取成功,后续共享获取也可能成功
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功获取,设置head并传播
setHeadAndPropagate(node, r);
p.next = null;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}

setHeadAndPropagate传播唤醒

1
2
3
4
5
6
7
8
9
10
11
12
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);

// 如果还有剩余资源或需要传播,唤醒后继共享节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
共享模式的传播机制:

Semaphore(3):3个许可

队列:[head] ◀──▶ [T1 SHARED] ◀──▶ [T2 SHARED] ◀──▶ [T3 SHARED]

释放1个许可:
1. T1被唤醒,获取成功,propagate > 0
2. T1调用setHeadAndPropagate
3. 唤醒T2
4. T2获取成功,继续传播唤醒T3
5. 连锁唤醒所有可以获取的共享节点

releaseShared释放共享锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}

基于AQS的同步器实现

ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class ReentrantLock {
private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {
// 非公平尝试获取
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 无锁,CAS获取
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// 释放
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}

// 非公平锁
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

// 公平锁
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 检查队列中是否有等待者
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
setState(nextc);
return true;
}
return false;
}
}
}

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Semaphore {
private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits); // state = 许可数量
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}

// 使用
// Semaphore sem = new Semaphore(3);
// sem.acquire(); // state: 3 -> 2
// sem.acquire(); // state: 2 -> 1
// sem.release(); // state: 1 -> 2
}

CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CountDownLatch {
private final Sync sync;

private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // state = 计数
}

int getCount() {
return getState();
}

// 共享获取:count为0时成功
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

// 共享释放:count减1
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0; // 减到0时返回true,触发唤醒
}
}
}

// 使用
// CountDownLatch latch = new CountDownLatch(3);
// latch.countDown(); // state: 3 -> 2
// latch.countDown(); // state: 2 -> 1
// latch.countDown(); // state: 1 -> 0,唤醒所有await的线程
// latch.await(); // state != 0 时阻塞
}

ReentrantReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class ReentrantReadWriteLock {
// state的高16位表示读锁数量,低16位表示写锁数量
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 读锁数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 写锁数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

abstract static class Sync extends AbstractQueuedSynchronizer {
// 写锁获取(独占)
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 有锁
if (w == 0 || current != getExclusiveOwnerThread())
return false; // 有读锁,或写锁不是自己的
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires); // 写锁重入
return true;
}
// 无锁,CAS获取写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

// 读锁获取(共享)
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 有写锁且不是自己持有
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 成功获取读锁
return 1;
}
return fullTryAcquireShared(current);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
ReentrantReadWriteLock的state:

state (32位)
┌─────────────────┬─────────────────┐
│ 高16位 │ 低16位 │
│ 读锁数量 │ 写锁数量 │
└─────────────────┴─────────────────┘

例:state = 0x00020001
读锁数量 = 2
写锁数量 = 1

自定义同步器

实现一个简单的互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class SimpleMutex {
private final Sync sync = new Sync();

private static class Sync extends AbstractQueuedSynchronizer {
// 是否被独占
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

// 尝试获取锁
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 尝试释放锁
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

Condition newCondition() {
return new ConditionObject();
}
}

public void lock() {
sync.acquire(1);
}

public void unlock() {
sync.release(1);
}

public boolean tryLock() {
return sync.tryAcquire(1);
}

public Condition newCondition() {
return sync.newCondition();
}
}

实现一个共享锁(允许N个线程同时访问)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class SharedLock {
private final Sync sync;

public SharedLock(int count) {
sync = new Sync(count);
}

private static class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}

@Override
protected int tryAcquireShared(int arg) {
for (;;) {
int current = getState();
int newCount = current - arg;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}

@Override
protected boolean tryReleaseShared(int arg) {
for (;;) {
int current = getState();
int newCount = current + arg;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}

public void lock() {
sync.acquireShared(1);
}

public void unlock() {
sync.releaseShared(1);
}
}

实现一个闭锁(只能打开一次)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class OneShotLatch {
private final Sync sync = new Sync();

private static class Sync extends AbstractQueuedSynchronizer {
// state: 0=关闭,1=打开

@Override
protected int tryAcquireShared(int arg) {
// 打开状态返回1(成功),关闭状态返回-1(失败)
return getState() == 1 ? 1 : -1;
}

@Override
protected boolean tryReleaseShared(int arg) {
setState(1); // 打开闭锁
return true;
}
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void signal() {
sync.releaseShared(1);
}
}

// 使用
OneShotLatch latch = new OneShotLatch();

// 等待线程
new Thread(() -> {
latch.await(); // 阻塞直到signal
System.out.println("继续执行");
}).start();

// 触发线程
Thread.sleep(1000);
latch.signal(); // 打开闭锁,唤醒所有等待线程

Condition条件队列

AQS不仅有同步队列,还支持条件队列(通过ConditionObject实现)。

条件队列结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                    同步队列(CLH队列)
┌──────────────────────────────────────┐
│ │
head ──▶│ [哨兵] ◀──▶ [T1] ◀──▶ [T2] ◀──▶ [T3]│◀── tail
│ │
└──────────────────────────────────────┘

条件队列(单向链表)
┌──────────────────────────────────────┐
│ │
firstWaiter ──▶ [T4] ──▶ [T5] ──▶ [T6] ◀── lastWaiter
│ CONDITION │
└──────────────────────────────────────┘

await():从同步队列移到条件队列
signal():从条件队列移到同步队列

await实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();

// 1. 加入条件队列
Node node = addConditionWaiter();

// 2. 完全释放锁
int savedState = fullyRelease(node);

int interruptMode = 0;
// 3. 循环检查是否在同步队列中
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

// 4. 被signal后,重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;

// 5. 清理取消的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();

if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

signal实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();

Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
// 移除条件队列头节点
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 转移到同步队列
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
// CAS修改状态:CONDITION -> 0
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;

// 加入同步队列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒
return true;
}

await/signal流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
await()流程:

持有锁的线程T1调用await()


加入条件队列


完全释放锁(state -> 0)


park()阻塞

等待signal...


被signal唤醒


重新获取锁(acquireQueued)


从await()返回


signal()流程:

持有锁的线程T2调用signal()


从条件队列取出节点


转移到同步队列尾部


unpark()唤醒线程(如果需要)


T2继续执行(还持有锁)


T2释放锁后,T1才能重新获取

总结

AQS核心设计

组件 作用
state 同步状态,由子类定义语义
CLH队列 管理等待获取锁的线程
条件队列 管理await的线程
Node 封装线程和等待状态
LockSupport 线程阻塞/唤醒原语

模板方法

方法 模式 作用
tryAcquire 独占 尝试获取锁(子类实现)
tryRelease 独占 尝试释放锁(子类实现)
tryAcquireShared 共享 尝试获取共享锁(子类实现)
tryReleaseShared 共享 尝试释放共享锁(子类实现)
isHeldExclusively 独占 是否被当前线程独占

基于AQS的同步器

同步器 模式 state含义
ReentrantLock 独占 锁重入次数
Semaphore 共享 许可数量
CountDownLatch 共享 剩余计数
ReentrantReadWriteLock 独占+共享 高16位读/低16位写
StampedLock 乐观读 版本戳

AQS是Java并发编程的基石,理解它的原理有助于更好地使用和扩展各种同步工具。