Lock and AQS

JUC包

JUC包目录。

title

其中包含了两个子包:atomic以及lock,另外在concurrent下的阻塞队列以及executors,这些类主要是依靠volatile以及CAS实现的

整体结构如图:

title

Lock简介

Lock是一个接口。

与synchronized相比,Lock拥有了锁获取和释放的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

Lock是一个接口,有许多实现他的类,比如ReentranrLock,但是查看他的源码会发现大部分方法都是在调用他的内部类Sync的方法,而Sync继承了AQS,因此,Lock实现的核心还是AQS。

title

AQS

AQS的设计是使用模板方法设计模式,他将一些相同部分的代码实现,将不同的代码放到不同的子类中去;而且,在AQS的方法中,也会调用子类的代码。模板设计模式

例如:

1
2
3
4
5
6
7
8
9
10
11
//子类需要重写tryAcquire()
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

//但是在AQS的模板方法中又调用了tryAcquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

因此,AQS只需要实现各自不同的tryAquire()就行了,比如是公平锁还是非公平锁,是独占锁还是共享锁。

AQS提供的模板方法可以分为3类:

  • 独占式获取与释放同步状态;

  • 共享式获取与释放同步状态;

  • 查询同步队列中等待线程情况;

AQS的功能分为两种:独占和共享

  • 独占锁,每次只能有一个线程持有锁,比如ReentrantLock就是以独占方式实现的互斥锁.
  • 共享锁,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock.

AQS实现

AQS中有两个重要的成员,一个是CLH队列,一个是state。

state

AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。

CLH

CLH是一个先进先出的队列。如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息构造成一个Node加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。

CLH的头节点是空的,啥也不存的。

1
2
3
4
5
6
7
8
9
10
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}

Node

Node代表的是一个正在阻塞等待的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//当前节点处于共享模式的标记
static final Node SHARED = new Node();

//当前节点处于独占模式的标记
static final Node EXCLUSIVE = null;

//线程被取消了
static final int CANCELLED = 1;
//释放资源后需唤醒后继节点
static final int SIGNAL = -1;
//等待condition唤醒
static final int CONDITION = -2;
//工作于共享锁状态,需要向后传播,
//比如根据资源是否剩余,唤醒后继节点
static final int PROPAGATE = -3;

//等待状态,有1,0,-1,-2,-3五个值。分别对应上面的值
volatile int waitStatus;

//前驱节点
volatile Node prev;

//后继节点
volatile Node next;

//等待锁的线程
volatile Thread thread;

//等待条件的下一个节点,ConditonObject中用到
Node nextWaiter;

独占锁的获取

1
2
3
4
5
6
7
8
9
10
//通过子类的tryAcquire()获取锁,不同的子类有不同的实现,要是获取失败,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),将该线程放入CLH  
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//selfInterrupt只是将该线程状态设置为中断,并没有真正中断,可中断锁还是需要lockinterrutly
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

入队操作:主要采取CAS+自旋的方式,一开始采用CAS快速入队,失败了之后再采用自旋操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//入队操作,mode = Node.EXCLUSIVE,独占锁
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
/*
这段代码进行快速入队,实际上与enq中差不多。这样做的原因是:
把最有可能成功执行的代码直接写在最常用的调用处,因为在线程数不多的情况下,CAS还是很难失败的。因此 这种写法可以节省多条指令。因为调用enq需要一次方法调用,进入循环,比较null,然后才到了红框中一样 的代码。大概类似于内联函数的优化
总而言之,节省指令,提高效率。
*/
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;//记录尾节点
if (t == null) { //由于采用lazy initialize,当队列为空时,需要进行初始化
//通过CAS设置head和tail节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;//将node的前节点设置为原tail节点
if (compareAndSetTail(t, node)) {//CAS更新tail节点,更新成功则将原tail节点的后节点设置为node,返回原tail节点,入列成功;
t.next = node;
return t;
}
}
}

在把node插入队列末尾后,它并不立即挂起该节点中线程,因为在插入它的过程中,前面的线程可能已经执行完成,所以它会先进行自旋操作acquireQueued(node, arg),尝试让该线程重新获取锁!当条件满足获取到了锁则可以从自旋过程中退出,否则继续。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//如果节点的前驱是队列的头节点并且能拿到资源,获取锁成功,结束
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判断当前节点是否应该被挂起。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

判断节点是否应该被挂起,当前驱节点是SIGNAL的时候,直接挂起线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前驱节点的状态是SIGNAL,说明前驱节点释放资源后会通知自己
//此时当前节点可以安全的park(),因此返回true
return true;
if (ws > 0) {
//前驱节点的状态是CANCLLED,说明前置节点已经放弃获取资源了
//此时一直往前找,直到找到最近的一个处于正常等待状态的节点
//并排在它后面,返回false
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驱节点的状态是0或PROPGATE,则利用CAS将前置节点的状态置
//为SIGNAL,让它释放资源后通知自己
//如果前置节点刚释放资源,状态就不是SIGNAL了,这时就会失败
// 返回false
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}


private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

acquire()整个流程:

  • 调用子类的tryAquire()尝试获取资源,成功,直接返回。失败,继续。

  • 获取失败,将该线程生成一个Node节点通过addWaiter(Node.EXCLUSIVE), arg)添加到等待队列

  • 插入等待队列后,防止在这个阶段资源又有了。根据前置节点状态状态判断是否应该继续获取资源。如果前驱是头结点,继续尝试获取资源;获取成功,返回;否则,继续。

  • 在每一次自旋获取资源过程中,失败后调用shouldParkAfterFailedAcquire(Node, Node)检测当前节点是否应该park()。

    如果前置节点是SIGNAL状态,就挂起,返回true。

    如果前置节点状态为CANCELLED,就一直往前找,直到找到最近的一个处于正常等待状态的节点,并排在它后面,返回false,acquireQueed()接着自旋尝试。

    前置节点处于其他状态,利用CAS将前置节点状态置为SIGNAL。当前置节点刚释放资源,状态就不是SIGNAL了,导致失败,返回false。但凡返回false,就导致acquireQueed()接着自旋尝试。

  • 若返回true,则调用parkAndCheckInterrupt()中断当前节点中的线程。若返回false,则接着自旋获取资源。

  • parkAndCheckInterrupt()挂起线程。

共享锁的获取

共享锁就是同时可以有多个线程访问。实现与独占锁差不多,唯一的不同就是需要判断是否还有剩余资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
//不同之处,判断剩余资源是否大于0
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

公平锁

非公平锁的吞吐量较高例如默认状态的ReentrantLock 有新线程来了先争夺一下锁,没成功再去排队。
公平锁是java关键字synchronized的重锁模式,谁来了都乖乖排队,后来的线程不能争夺锁,一定要入队列等待前一个线程来unpark自己,除非队列里没有其他线程。

中断锁

当线程等待的时候,如果被interrupt(),那么直接抛出中断异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//中断锁抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

超时锁

在获取锁的过程中,超过某一个时长,自动放弃获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//超出时间,直接终止
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

释放锁

首先调用子类的tryRelease()方法释放锁,然后唤醒后继节点,(只唤醒一个节点)在唤醒的过程中,需要判断后继节点是否满足情况,如果后继节点不为空且不是作废状态,则唤醒这个后继节点,否则从tail节点向前寻找合适的节点,如果找到,则唤醒。

ConditionObject

这是AQS的一个内部类,其维护了一个condition队列。主要有await()和signal()等方法。

await()

await():当前线程处于阻塞状态,直到调用signal()或中断才能被唤醒。

  • 将当前线程封装成node且等待状态为CONDITION。
  • 释放当前线程持有的所有资源,让下一个线程能获取资源。
  • 加入到条件队列后,则阻塞当前线程,等待被唤醒。
  • 如果是因signal被唤醒,则节点会从条件队列转移到等待队列;如果是因中断被唤醒,则记录中断状态。两种情况都会跳出循环。
  • 若是因signal被唤醒,就自旋获取资源;否则处理中断异常。

condition队列与CLH最大的不同就是CLH是双向列表,condition队列是单向列表。

CLH是单向列表的原因是可能需要获取前置节点的一些属性,比如说查看前置节点是不是头节点之类的。

ConditionObject详解

ConditionObject用来实现锁的等待通知机制。ConditionObject内部维护了一个等待队列,与CLH不同的是这个队列是单向链表。

与Object wait/notify区别

Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制,而Condition与Lock配合完成等待通知机制,前者是java底层级别的,后者是语言级别的,具有更高的可控制性和扩展性。

Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个。

Condition能够支持超时时间的设置,而Object不支持。

await方法

  • void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
  • long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时
  • boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间

signal方法

  • void signal():唤醒一个等待在condition上的线程(第一个线程,条件队列是一个FIFO的队列),将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
  • void signalAll():与1的区别在于能够唤醒所有等待在condition上的线程。

await实现原理

await主要做了三件事:

  • 将线程包装成Node,插入到条件队列。
  • 释放线程拥有的锁。
  • 阻塞当前线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

插入到条件队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程包装成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
//尾插入
t.nextWaiter = node;
//更新lastWaiter
lastWaiter = node;
return node;
}

与等待队列不同的是,条件队列没有头节点。

释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
//成功释放同步状态
failed = false;
return savedState;
} else {
//不成功释放同步状态抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

结束await()状态:

1
2
3
4
5
6
7
8
//当前节点被移动到了同步队列中(即另外线程调用的condition的signal或者signalAll方法),while中逻辑判断为false后结束while循环
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
//当线程被中断,会退出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

AQS详解