AbstractQueuedSynchronizer 的共享锁独占锁实现
AbstractQueuedSynchronizer 可用作为一个同步工具的基础,持有一个volatile int state的属性,表示同步的状态。使用 cas 操作修改 state 保证线程安全。内部使用 LockSupport 来阻塞唤醒线程,维护链表来表示等待锁的队列。
链表的节点结构如下
- int waitStatus 有如下四中状态
SIGNAL -1 : 表示节点的后继节点已经阻塞或即将阻塞,需要信号唤醒
CANCELLED 1 : 因为超时或中断取消等待
CONDITION -2 : 等待某个条件而被阻塞,在条件队列里面
PROPAGATE -3 : 共享模式中使用,表示当前场景下后续的acquireShared能够得以执行
0 : None of the above - Node prev 前驱节点
- Node next 后继节点。
- Node nextWaiter 存储condition队列中的后继节点
- Thread thread 关联的线程实例
独占锁
需要重写如下两个方法
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
获取锁
独占模式在同一时刻只能有一个线程获得锁,调用 acquire 获取锁:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里面首先会调用我们重写的 tryAcquire 方法来尝试获取锁,如果获取失败加入队列,然后调用 acquireQueued 。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在这里面会再次尝试获取锁,即使被阻塞,唤醒之后还会进入这里面尝试获取锁。当能够执行到 parkAndCheckInterrupt 时会被阻塞:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
当被阻塞的线程被唤醒时会执行 Thread.interrupted() 判断线程有没有被中断过,同时会清除中断状态,返回 true or false 。从这里开这种方法对中断好像不太敏感,要等到被唤醒并且抢到锁时才能对中断做出响应。
释放锁
只会唤醒一个线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先调用我们自己定义的释放锁的方法,如果释放成功,从等待队列中选一个线程唤醒 LockSupport.unpark(s.thread);
关于 tryAcquire tryRelease 的重写,以及公平锁
参展一些现有的实现 tryAcquire tryRelease 中一般需要使用 cas 修改 state 值,然后修改独占模式的运行线程引用。
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
if (compareAndSetState(1,0)) {
setExclusiveOwnerThread(null);
return true;
}
return false;
}
如果要实现公平锁,一般在 tryAcquire 的 if 语句中加一个 判断当前节点是不是等待最久的节点的函数,让对列中等待最久的节点获得锁,同时禁止不在队列里面的节点获取锁。
tryAcquireNanos
用于实现有限时间的锁等待
关键代码是调用的 doAcquireNanos 函数,关键部分如下:
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
使用 LockSupport.parkNanos(this, nanosTimeout) 实现有限时间挂起。
acquireInterruptibly
可中断的锁等待
关键代码是如下部分:
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
当 parkAndCheckInterrupt 挂起结束返回,参与锁的竞争时,如果被设置过中断则直接抛出中断异常,不是等到获取锁然后在返回时中断。
共享锁
需要重写如下两个方法:
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
获取锁
只要还有锁线程来了就能获取,当锁不够时 tryAcquireShared 返回值小于 0, 然后调用 doAcquireShared 方法。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
在 doAcquireShared 方法中首先添加节点到队列中,然后在进入循环后,如果当前节点是首节点,还会进行一次
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
最后调用 parkAndCheckInterrupt 部分跟 独占锁完全相同。
释放锁
在 doReleaseShared 中会唤醒多个阻塞的线程
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
首先调用我们自己定义的 tryReleaseShared,当 tryReleaseShared 成功之后调用 doReleaseShared 唤醒等待的线程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// cas 操作
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 选一个线程唤醒
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
tryAcquireShared tryReleaseShared 的重写
因为可以有多个线程持有锁,任何线程都可以释放共享锁,因而不必在重写时设置锁的持有者。大体可以重写为一下形式:
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (compareAndSetState(c, c+releases ))
return nextc == 0;
}
}
protected boolean isHeldExclusively()
这个函数也需要自己实现,不同的业务逻辑有不同的实现方式,如重入锁中有如下实现:
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
在 ThreadPoolExecutor 中有不同的实现
protected boolean isHeldExclusively() {
return getState() != 0;
}