AQS(AbstractQueuedSynchronizer)是Java并发包的核心基础类,ReentrantLock、Semaphore、CountDownLatch等同步工具都基于它实现。本文深入剖析AQS的设计原理和实现细节。
AQS是什么 AQS(抽象队列同步器)是一个用于构建锁和同步器的框架。它提供了:
同步状态管理 (state变量)
线程排队机制 (CLH队列)
线程阻塞/唤醒机制 (LockSupport)
1 2 3 4 5 6 7 8 9 10 11 ┌─────────────────────────────────────────────────────────────┐ │ JUC同步器 │ ├──────────────┬──────────────┬──────────────┬────────────────┤ │ ReentrantLock│ Semaphore │CountDownLatch│ ReadWriteLock │ ├──────────────┴──────────────┴──────────────┴────────────────┤ │ AQS │ │ AbstractQueuedSynchronizer │ ├─────────────────────────────────────────────────────────────┤ │ LockSupport │ │ (park / unpark) │ └─────────────────────────────────────────────────────────────┘
核心数据结构 state状态变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public abstract class AbstractQueuedSynchronizer { private volatile int state; protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return STATE.compareAndSet(this , expect, update); } }
state在不同同步器中的含义:
同步器
state含义
ReentrantLock
0=未锁定,>0=锁定次数(重入)
Semaphore
可用许可数量
CountDownLatch
剩余计数
ReentrantReadWriteLock
高16位=读锁数,低16位=写锁数
CLH队列 AQS使用变种的CLH队列管理等待线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ┌─────────────────────────────────────────────────────────────────┐ │ CLH队列(双向链表) │ │ │ │ head tail │ │ │ │ │ │ ▼ ▼ │ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │ Node │◀───▶│ Node │◀───▶│ Node │◀───▶│ Node │ │ │ │(哨兵)│ │ T1 │ │ T2 │ │ T3 │ │ │ │ │ │SIGNAL│ │SIGNAL│ │ 0 │ │ │ └──────┘ └──────┘ └──────┘ └──────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ WAITING WAITING WAITING │ │ │ └─────────────────────────────────────────────────────────────────┘
Node节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 static final class Node { volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; }
waitStatus状态 1 2 3 4 5 6 7 8 9 10 11 12 waitStatus状态转换: 0 (初始) ──▶ SIGNAL (-1) 正常等待,需要唤醒后继 │ ├──▶ CANCELLED (1) 线程取消/超时/中断 │ └──▶ CONDITION (-2) 在条件队列中(Condition.await) SIGNAL: 当前节点释放锁时需要unpark后继节点 CANCELLED: 节点已取消,会被清理出队列 CONDITION: 节点在条件队列中,等待signal转移到同步队列 PROPAGATE: 共享模式下,释放操作需要传播给后续节点
两种模式 独占模式(Exclusive) 同一时刻只有一个线程能获取同步状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
使用独占模式的同步器:
ReentrantLock
ReentrantReadWriteLock的写锁
共享模式(Shared) 多个线程可以同时获取同步状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
使用共享模式的同步器:
Semaphore
CountDownLatch
ReentrantReadWriteLock的读锁
独占模式源码分析 acquire获取锁 1 2 3 4 5 6 7 8 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
addWaiter入队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private Node addWaiter (Node mode) { Node node = new Node (mode); for (;;) { Node oldTail = tail; if (oldTail != null ) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 addWaiter过程: 初始状态(队列为空): head = null, tail = null 第一个线程入队: 1. 初始化,创建哨兵节点 head ──▶ [哨兵] ◀── tail 2. 新节点加入 head ──▶ [哨兵] ◀──▶ [Node T1] ◀── tail 后续线程入队: head ──▶ [哨兵] ◀──▶ [T1] ◀──▶ [T2] ◀──▶ [T3] ◀── tail
acquireQueued自旋获取 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 final boolean acquireQueued (final Node node, int arg) { boolean interrupted = false ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
shouldParkAfterFailedAcquire判断是否阻塞 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false ; }
1 2 3 4 5 6 7 8 9 10 11 12 为什么需要SIGNAL状态? 线程阻塞前必须确保前驱会唤醒自己: 1. 将前驱状态设为SIGNAL 2. 前驱释放锁时检查状态,发现SIGNAL则唤醒后继 避免信号丢失: T1持有锁 ──▶ [head] ◀──▶ [T2 SIGNAL] T1释放锁时: 1. 检查head.waitStatus == SIGNAL 2. 调用unparkSuccessor(head)唤醒T2
release释放锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) node.compareAndSetWaitStatus(ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node p = tail; p != node && p != null ; p = p.prev) if (p.waitStatus <= 0 ) s = p; } if (s != null ) LockSupport.unpark(s.thread); }
完整流程图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 acquire流程: tryAcquire() │ ┌────┴────┐ 成功 失败 │ │ ▼ ▼ 获得锁 addWaiter() │ ▼ acquireQueued() │ ┌────┴────┐ 前驱是head 否 │ │ ▼ ▼ tryAcquire shouldPark? │ │ ┌────┴───┐ │ 成功 失败 ▼ │ └──▶ park() ▼ │ 设为head unpark后继续 │ 自旋尝试 ▼ 返回 release流程: tryRelease() │ ┌────┴────┐ 成功 失败 │ │ ▼ ▼ head.ws != 0? 返回false │ ┌────┴────┐ 是 否 │ │ ▼ ▼ unparkSuccessor 返回true │ ▼ 返回true
共享模式源码分析 acquireShared获取共享锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean interrupted = false ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; return ; } } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); throw t; } finally { if (interrupted) selfInterrupt(); } }
setHeadAndPropagate传播唤醒 1 2 3 4 5 6 7 8 9 10 11 12 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
1 2 3 4 5 6 7 8 9 10 11 12 共享模式的传播机制: Semaphore(3):3个许可 队列:[head] ◀──▶ [T1 SHARED] ◀──▶ [T2 SHARED] ◀──▶ [T3 SHARED] 释放1个许可: 1. T1被唤醒,获取成功,propagate > 0 2. T1调用setHeadAndPropagate 3. 唤醒T2 4. T2获取成功,继续传播唤醒T3 5. 连锁唤醒所有可以获取的共享节点
releaseShared释放共享锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
基于AQS的同步器实现 ReentrantLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class ReentrantLock { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } } static final class NonfairSync extends Sync { protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; setState(nextc); return true ; } return false ; } } }
Semaphore 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class Semaphore { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits ) { setState(permits ); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } } }
CountDownLatch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class CountDownLatch { private final Sync sync; private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c - 1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } } }
ReentrantReadWriteLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class ReentrantReadWriteLock { static final int SHARED_SHIFT = 16 ; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1 ; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 ; static int sharedCount (int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; } abstract static class Sync extends AbstractQueuedSynchronizer { protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; } protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { return 1 ; } return fullTryAcquireShared(current); } } }
1 2 3 4 5 6 7 8 9 10 11 ReentrantReadWriteLock的state: state (32位) ┌─────────────────┬─────────────────┐ │ 高16位 │ 低16位 │ │ 读锁数量 │ 写锁数量 │ └─────────────────┴─────────────────┘ 例:state = 0x00020001 读锁数量 = 2 写锁数量 = 1
自定义同步器 实现一个简单的互斥锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class SimpleMutex { private final Sync sync = new Sync (); private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean isHeldExclusively () { return getState() == 1 ; } @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (getState() == 0 ) throw new IllegalMonitorStateException (); setExclusiveOwnerThread(null ); setState(0 ); return true ; } Condition newCondition () { return new ConditionObject (); } } public void lock () { sync.acquire(1 ); } public void unlock () { sync.release(1 ); } public boolean tryLock () { return sync.tryAcquire(1 ); } public Condition newCondition () { return sync.newCondition(); } }
实现一个共享锁(允许N个线程同时访问) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class SharedLock { private final Sync sync; public SharedLock (int count) { sync = new Sync (count); } private static class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } @Override protected int tryAcquireShared (int arg) { for (;;) { int current = getState(); int newCount = current - arg; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override protected boolean tryReleaseShared (int arg) { for (;;) { int current = getState(); int newCount = current + arg; if (compareAndSetState(current, newCount)) { return true ; } } } } public void lock () { sync.acquireShared(1 ); } public void unlock () { sync.releaseShared(1 ); } }
实现一个闭锁(只能打开一次) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class OneShotLatch { private final Sync sync = new Sync (); private static class Sync extends AbstractQueuedSynchronizer { @Override protected int tryAcquireShared (int arg) { return getState() == 1 ? 1 : -1 ; } @Override protected boolean tryReleaseShared (int arg) { setState(1 ); return true ; } } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public void signal () { sync.releaseShared(1 ); } } OneShotLatch latch = new OneShotLatch ();new Thread (() -> { latch.await(); System.out.println("继续执行" ); }).start(); Thread.sleep(1000 ); latch.signal();
Condition条件队列 AQS不仅有同步队列,还支持条件队列(通过ConditionObject实现)。
条件队列结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 同步队列(CLH队列) ┌──────────────────────────────────────┐ │ │ head ──▶│ [哨兵] ◀──▶ [T1] ◀──▶ [T2] ◀──▶ [T3]│◀── tail │ │ └──────────────────────────────────────┘ 条件队列(单向链表) ┌──────────────────────────────────────┐ │ │ firstWaiter ──▶ [T4] ──▶ [T5] ──▶ [T6] ◀── lastWaiter │ CONDITION │ └──────────────────────────────────────┘ await():从同步队列移到条件队列 signal():从条件队列移到同步队列
await实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
signal实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } private void doSignal (Node first) { do { if ((firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!node.compareAndSetWaitStatus(Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
await/signal流程图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 await()流程: 持有锁的线程T1调用await() │ ▼ 加入条件队列 │ ▼ 完全释放锁(state -> 0) │ ▼ park()阻塞 │ 等待signal... │ ▼ 被signal唤醒 │ ▼ 重新获取锁(acquireQueued) │ ▼ 从await()返回 signal()流程: 持有锁的线程T2调用signal() │ ▼ 从条件队列取出节点 │ ▼ 转移到同步队列尾部 │ ▼ unpark()唤醒线程(如果需要) │ ▼ T2继续执行(还持有锁) │ ▼ T2释放锁后,T1才能重新获取
总结 AQS核心设计
组件
作用
state
同步状态,由子类定义语义
CLH队列
管理等待获取锁的线程
条件队列
管理await的线程
Node
封装线程和等待状态
LockSupport
线程阻塞/唤醒原语
模板方法
方法
模式
作用
tryAcquire
独占
尝试获取锁(子类实现)
tryRelease
独占
尝试释放锁(子类实现)
tryAcquireShared
共享
尝试获取共享锁(子类实现)
tryReleaseShared
共享
尝试释放共享锁(子类实现)
isHeldExclusively
独占
是否被当前线程独占
基于AQS的同步器
同步器
模式
state含义
ReentrantLock
独占
锁重入次数
Semaphore
共享
许可数量
CountDownLatch
共享
剩余计数
ReentrantReadWriteLock
独占+共享
高16位读/低16位写
StampedLock
乐观读
版本戳
AQS是Java并发编程的基石,理解它的原理有助于更好地使用和扩展各种同步工具。