15110 2018-10-04 2020-06-25
前言:这次我们看下Lock接口,主要摘自《Java并发编程的艺术》的Lock接口章节。
一、开篇
开篇之前,我们总结一下用于显式解决多线程冲突的几种手段,大致如下
- 使用synchronized关键字,确保各线程的有序进行
- 使用volatile关键字,确保线程间数据更新可见
- 使用Unsafe类,其中的思想就是CAS(Compare And Swap)+ 自旋
这节提到的Lock接口底层实现原理大致如下
- 一个int类型状态值(用于锁的状态变更)
- 一个双向链表(用于存储等待中的线程)
- 通过CAS来获取状态值修改 + 自旋
希望通过上面的简述,能够减少对Lock接口的一些疑惑。
二、概述
Lock接口提供的的synchronized关键字所不具备的主要特性如下所示
- 尝试非阻塞地获取锁:当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁。
- 能被中断地获取锁:与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放。
- 超时获取锁,在指定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回。
Lock是一个接口,它定义了锁获取和释放的基本操作,Lock的API如下
方法名称 | 描述 |
---|---|
void lock() | 获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回 |
void lockInterruptibly() throws InterruptedException | 可中断地获取锁,和lock方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程 |
boolean tryLock() | 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException | 超时的获取锁,当前线程在以下3种情况下会返回:1当前线程在超时时间内获得了锁;2当前线程在超时时间内被中断;3超时时间结束,返回false |
void unlock() | 释放锁 |
Condition newCondition() | 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁 |
这里先简单介绍一些Lock接口的API,随后将会详细介绍同步器AbstractQueuedSynchronizer以及常用Lock接口的实现ReentrantLock。Lock接口的实现基本都是通过聚合一个同步器的子类来完成线程访问控制的。
三、队列同步器
队列同步器AbstractQueuedSynchronized(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器是实现锁的关键,在锁定实现中聚合同步器,利用同步器实现锁的含义。可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
1、队列同步器的接口与示例
同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。
- getState():获取当前同步状态
- setState(int newState):设置当前同步状态
- compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。
同步器可重写的方法与描述如下所示
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态, 返回大于等于0的值,表示获取成功,反之,获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程锁占用 |
实现自定义同步组件时,将会调用同步器提供的模板方法,这些(部分)模板方法与描述如下所示
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待 |
void acquireInterruptibly(int arg) | 与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回 |
boolean tryAcquireNanos(int arg,long nanos) | 在acquireInterruptibly(int arg)基础上增加超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将返回false,如果获取到了返回true |
void acquireShared(int arg) | 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInterruptibly(int arg) | 与acquireShared(int arg)相同,该方法响应中断 |
boolean tryAcquireSharedNanos(int arg,long nanos) | 在acquireSharedInterruptibly(int arg)基础上增加了超时限制 |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection<Thread> getQueuedThreads() | 获取等待在同步队列上的线程集合 |
同步器提供的模板方法基本上分为3类:1独占式获取与释放同步状态、2共享式获取与释放同步状态、3查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。
只有掌握了同步器的工作原理才能更加深入地理解并发包中其他的并发组件,下面通过一个独占锁的示例来深入了解一下同步器的工作原理。
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于独占状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
@Override
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) {
throw new IllegalArgumentException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
从上面代码中不难看出两者之间的关系。
2、队列同步器的实现分析
接下来将从实现角度分析同步器是如何完成线程同步的,主要包括:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等同步器的核心数据结构与模板方法。
1、同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把收个节点中的线程唤醒,使其再次尝试获取同步状态。
同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称以及描述如下所示
属性类型与名称 | 描述 |
---|---|
int waitStatus | 等待状态 |
Node prev | 前驱节点,当节点加入同步队列时被设置(尾部添加) |
Node next | 后继节点 |
Node nextWaiter | 等待队里中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用一个字段 |
Thread thread | 获取同步状态的线程 |
其中等待状态包含一下状态
- CANCELLED,值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化。
- SIGNAL,值为-1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
- CONDITION,值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。
- PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件地传播下去。
- INITIAL,值为0,初始状态。
节点是构成同步队列的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如下
同步器包含了两个节点类型的引用,一个指向节点,而另一个执行尾节点。试想一下,当一个线程成功地获取同步状态(或者锁),其他线程将无法获取到同步状态,转而被构成成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步队列提供一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
同步器将节点加入到同步队列的过程如下所示
同步节点遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而这个后继节点将会在获取同步状态成功时将自己设置为首节点,如下所示
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头结点的方法并不需要使用CAS来保证,它只需要将首节点设置成原首节点的后继节点并断开原首节点的next引用即可。
2、独占式同步状态获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出,该方法代码如下
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全地获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXECLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node, int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
/**
* Creates and enqueues node for current thread and given mode.
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上述代码通过使用compareAndSetTail(Node expect, Node update)方法来确保节点能够被线程安全添加。试想一下:如果使用一个普通的LinkedList来维护节点之间的关系,那么当一个线程获取了同步状态,而其他多个线程由于调用tryAcquire(int arg)方法获取同步状态失败而并发地被添加到LinkedList时,LinkedList将难以保证Node的正确添加,最终的结果可能是节点的数量有偏差,而且顺序也是混乱的。
在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS变得“串行化”了。
节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程),如下代码所示
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 关键代码
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireQueued(final Node node, int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头结点的才能够尝试获取同步状态,这是为什么?原因有两个,如下
- 头结点是成功获取到同步状态的节点,而头结点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头结点。
- 维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如下所示
在上图中,由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否是头节点,如果是则尝试获取同步状态。可以看到节点和节点之间在循环检查的过程中基本不互相通信,而是简单地判断自己的前驱是否为头结点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头结点的线程由于中断而被唤醒)。
独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如下所示
前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自旋过程。当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,如果对于锁这种并发组件而言,代表了当前线程获取了锁。
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。该方法代码如下
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
该方法执行时,会唤醒头结点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程。
分析了独占式同步状态获取和释放过程后,适当做个总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
3、共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,如果一个程序在对文件进行读操作时,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问,两种不同的访问模式在同一时刻对文件或资源的访问情况,如下所示
通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态,该方法代码如下
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱是头结点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。
与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以释放同步状态,该方法代码如下
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如Semaphone),它和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。
4、独占式超时获取同步状态
通过调用同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。该方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。
在分析该方法的实现前,先介绍一下响应中断的同步状态获取过程。在Java5之前,当一个线程获取不到锁而被阻塞在synchronized之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在synchronized上,等待着获取锁。在Java5中,同步器提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException。
超时获取同步状态过程可以被视为相应中断获取同步状态过程的“增强版”,doAcquireNanos(int arg, long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性。针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知,nanosTimeout计算公式为:nanosTimeout -= now - lastTime,其中now为当前唤醒时间,lastTime为上次唤醒时间,如果nanosTime大于0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒,反之,表示已经超时,该方法代码如下
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
该方法在自旋过程中,当节点的前驱节点为头结点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似,但是在同步状态获取失败的处理上有所不同。如果当前线程获取同步状态失败,则判断是否超时(nanosTimeout小于等于0表示已经超时),如果没有超时,重新计算超时间隔nanosTimeout,然后使当前线程等待nanosTimeout纳秒(当已到设置的超时时间,该线程会从LockSupport.parkNanos(Object blocker, long nanos)方法返回)。
如果nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超时非常短的情景下,同步器会进入无条件的快速自旋。
独占式超时获取同步状态的流程如下所示
独占式获取同步状态doAcquireNanos(int arg, long nanosTimeout)和独占式获取同步状态acquire(int args)在流程上非常相似,其主要区别在于未获取到同步状态时的处理逻辑。acquire(int args)在未获取到同步状态时,将会使当前线程一直处于等待状态,而doAcquireNanos(int arg, long nanosTimeout)会使当前线程等待nanosTimeout纳秒,如果当前线程在nanosTimeout纳秒内没有获取到同步状态,将会从等待逻辑中自动返回。
5、自定义同步组件
本节通过编写一个自定义同步组件来加深对同步器的理解。
设计一个同步工具:该工具在同一时刻,只允许至多两个线程同时访问,超过两个线程的方法将被阻塞,我们将这个同步工具命名为TwinsLock。
首先,确定访问模式。TwinsLock能够同一时刻支持多个线程的访问,这显然是共享式访问,因此,需要使用同步器提供的acquireShared(int args)方法等和Shared相关的方法,这就要求TwinsLock必须重写tryAcquireShared(int args)方法和tryReleaseShared(int args)方法,这样才能保证同步器共享式同步状态的获取与释放得以执行。
其次,定义资源数。TwinsLock在同一时刻允许至多两个线程的同时访问,表明同步资源数为2,这样可以设置初始状态status为2,当一个线程进行获取,status减1,该线程释放,则status加1,状态的合法范围为0、1和2,其中0表示当前已经有两个线程获取了同步资源,此时再有其他线程对同步状态进行获取,该线程只能被阻塞。在同步状态变更时,需要使用compareAndSet(int expect, int update)方法做原子性保障。
最后,组合自定义同步器。代码如下
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("参数必须大于0");
}
setState(count);
}
@Override
public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return null;
}
public static void main(String[] args) {
final Lock lock = new TwinsLock();
class Worker extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
ThreadState.SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
ThreadState.SleepUtils.second(1);
} finally {
lock.unlock();
}
}
}
}
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}
for (int i = 0; i < 10; i++) {
ThreadState.SleepUtils.second(1);
System.out.println();
}
}
}
下面来看一下JDK提供的几个现成的工具类。
四、重入锁
重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁还支持获取锁时的公平和非公平性选择。
回忆在同步器一节中的示例(Mutex),同时考虑如下场景:当一个线程调用Mutex的lock()方法获取锁之后,如果再次调用lock()方法,则该线程将会被自己阻塞,原因是Mutex在实现tryAcquire(int acquires)方法时没有考虑占有锁的线程再次获取锁的场景,而在调用tryAcquire(int acquires)方法时返回了false,导致该线程被阻塞。简单地说,Mutex是一个不支持重进入的锁。而synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁,而不像Mutex由于获取了锁,而在下一次获取锁时出现阻塞自己的情况。
ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。
这里提到一个锁获取公平性的问题,如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以锁获取的顺序的。ReentrantLock提供了一个构造方法,能够控制锁是否是公平的。
事实上,公平的锁机制往往没有非公平的效率高,但是,并不是任何场景都是以TPS作为唯一的指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到有限满足。
下面将着重分析ReentrantLock是如何实现重进入和公平性获取锁的特性,并通过测试来验证公平性获取锁对性能的影响。
1、实现重进入
重进入是指任意线程在获取到锁以后能够再次获取该锁而不会被锁阻塞,该特性的实现需要解决一下两个问题
- 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
- 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
ReentrantLock是通过组合自定义同步器来实现锁的获取与释放,以非公平性(默认的)实现为例,获取同步状态的代码如下
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。
成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时减少同步状态值,该方法代码如下
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果该锁被获取了n次,那么前(n - 1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
2、公平与非公平获取锁的区别
公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
回顾先前提到的nonfairTryAcquire(int acquires)方法,对于非公平锁,只有CAS设置成功,则表示当前线程获取了锁,而公平锁则不同,代码如下
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。
下面编写一个测试来观察公平和非公平锁在获取锁时的区别,在测试用例中定义了内部类ReentrantLock2,该类主要公开了getQueuedThreads()方法,该方法返回正在等待获取锁的线程列表,由于列表是逆序输出,为了方便观察结果,将其进行翻转,代码如下
public class ReentrantTest {
private static Lock fairLock = new ReetrantLock2(true);
private static Lock unfairLock = new ReetrantLock2(false);
@Test
public void fair() {
System.out.println("公平锁测试");
testLock(fairLock);
}
@Test
public void unfair() {
System.out.println("非公平锁测试");
testLock(unfairLock);
}
private void testLock(Lock lock) {
for (int i = 0; i < 6; i++) {
Thread thread = new Job(lock);
thread.start();
}
}
private static class Job extends Thread {
private Lock lock;
public Job(Lock lock) {
this.lock = lock;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
ReetrantLock2 temp = (ReetrantLock2) lock;
System.out.println("Lock by [" + getName() + "],Waiting by " + temp.getQueuedThreads());
System.out.println("Lock by [" + getName() + "],Waiting by " + temp.getQueuedThreads());
} finally {
lock.unlock();
}
}
}
@Override
public String toString() {
return getName();
}
}
private static class ReetrantLock2 extends ReentrantLock {
public ReetrantLock2(boolean fair) {
super(fair);
}
@Override
public Collection<Thread> getQueuedThreads() {
List<Thread> list = new ArrayList<>(super.getQueuedThreads());
Collections.reverse(list);
return list;
}
}
}
五、读写锁
之前提到锁(如Mutex和RentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁能够简化读写交互场景的编程方式。假设在程序中定义一个用作共享的缓存数据结构,它大部分时间提供读服务,而写操作占有的时间很少,但是写操作完成之后的更新需要对后续的读服务可见。
在没有读写锁支持的时候(Java5之前),如果需要完成上述工作就要使用Java的等待通知机制,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能执行(写操作之间依靠synchronized关键字进行同步),这样做的目的是使读操作能读到正确的数据,不会出现脏读。改用读写锁实现上述功能,只需要在读操作时获取读锁,写操作时获取写锁即可。当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行,编程方式对于使用等待通知机制的实现方式而言,变得简单明了。
一般情况下,读写锁的性能都会比其他排他锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排他锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadLock,它提供的特性如下
特性 | 说明 |
---|---|
公平性选择 | 支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平 |
重进入 | 该锁支持重进入,以读写锁为例:该线程在获取了读锁之后,能够再次获取读锁。而写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。 |
锁降级 | 遵循获取写锁、获取读锁在释放写锁的次序,写锁也能够降级为读锁 |
1、读写锁的接口与示例
ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock()方法和writeLock()方法,而其实现—ReentrantReadLock,除了接口方法之外,还提供了一些便于外界监控其内部工作状态的方法,如下所示
方法名称 | 描述 |
---|---|
int getReadLockCount() | 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,例如,仅一个线程,它连续获取(重进入)了n次读锁,那么占据读锁的线程数是1,但该方法返回n |
int getReadHoldCount() | 返回当前线程获取读锁的次数。该方法在Java 6中加入到ReentrantReadWriteLock中,使用ThreadLocal保存当前线程获取的次数,这也使得Java 6的实现变得更加复杂 |
boolean isWriteLocked() | 判断写锁是否被获取 |
int getWriteHoldCount() | 返回当前写锁被获取的次数 |
接下来,通过一个缓存示例说明读写锁的使用方式
public class Cache {
static Map<String, Object> map = new HashMap<>();
static ReentrantReadWriteLock rrwl = new ReentrantReadWriteLock();
static Lock r = rrwl.readLock();
static Lock w = rrwl.writeLock();
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
2、读写锁的实现分析
接下来分析ReentrantReadWriteLock的实现,主要包括:读写状态的设计、写锁的获取与释放、读锁的读取与释放以及锁降级(以下没有特别说明读写锁均可认为是ReentrantReadWriteLock)。
1、读写状态的设计
读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
如果在一个整形变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。
当前同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。读写锁是如何迅速确定读和写各自的状态呢?答案是通过位运算。假设当前同步状态值为S,写状态等于S & 0x0000FFFF(将高16位全部抹去),读状态等于S >>> 16(无符号补0右移16位)。当写状态增加1时,等于S + 1,当读状态增加1时,等于S + (1 << 16),也就是S + 0x00010000。
根据状态的划分能得出一个推论:S不等于0时,当写状态(S & 0x0000FFFF)等于0时,则读状态(S >>> 16)大于0,即读锁已被获取。
2、写锁的获取与释放
写锁是一个支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态,获取写锁代码如下
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的读取,那么正在运行的其他读线程就无法感知到当前线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦获取,则其他读写线程的后续访问均被阻塞。
写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
3、读锁的获取与释放
读锁是一个支持重进入的共享锁,它能被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全时)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。获取读锁的实现从Java 5到Java 6变得复杂许多,主要原因是新增了一些功能,例如getReadHoldCount()方法,作用是返回当前线程获取读锁的次数。读状态是所有线程读取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由于线程自身维护,这使得获取读锁的实现变得复杂。代码如下
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。
读锁的每次释放(线程安全的,可能多个线程同时释放读锁)均减少读状态,减少值是1 <<< 16。
4、锁降级
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后在获取读锁,这种分段的过程不能称之为锁降级。锁降级是指保持住当前写锁,再获取到读锁,随后释放先前拥有写锁的过程。
接下来看一个锁降级的示例。因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作,代码如下
public void processData() {
readLock.lock();
if (!update) {
// 必须先释放锁
readLock.unlock();
// 锁降级从写锁获取开始
writeLock.lock();
try {
if (!update) {
// 准备数据的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
}
try {
// 使用数据的流程(略)
} finally {
readLock.unlock();
}
}
上述示例中,当数据发生变更后,update变量(布尔类型且volatile修饰)被设置成为false,此时所有访问processData()方法的线程都能够感知到变化,但只有一个线程能够获取到写锁,其他线程会被阻塞在读锁和写锁的lock()方法上。当前线程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁降级。
锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设刺客另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。即遵循锁降级的步骤,则线程T将会被阻塞,知道当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。
ReentrantReadWriteLock不支持锁升级(把持读锁、获取写锁、最后释放读锁的过程)。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其跟新对其他获取到读锁的线程是不可见的。
六、LockSupport工具
之前需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。LockSupport定义了一组公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具。
LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。方法描述如下
方法名称 | 描述 |
---|---|
void park() | 阻塞当前线程,如果调用unpark(Thread thread)方法或者当前线程被中断,才能从park()方法返回 |
void parkNanos(long nanos) | 阻塞当前线程,最长不超过nanos纳秒,返回条件在park()的基础上增加了超时返回 |
void parkUntil(long deadline) | 阻塞当前线程,知道deadline时间(从1970年开始到deadline时间的毫秒数) |
void unpark(Thread thread) | 唤醒处于阻塞状态的线程 |
在Java 6中LockSupport增加了park(Object blocker)、parkNanos(Object, long nanos)和parkUntil(Object blocker, long deadline)3个方法,用于实现阻塞当前线程的功能,其中参数blocker是用来标识当前线程在等待的对象(以下简称阻塞对象),该对象主要用于问题排查和系统监控。
七、Condition接口
任意一个Java对象,都拥有一组监视器方法(定义在Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。
通过对比Object的监视器方法和Condition接口,可以更详细地了解Condition的特性,对比如下
对比项 | Object Monitor Methods | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁,再调用lock.newCondition()获取Condition对象 |
调用方式 | 直接调用,如object.wait() | 直接调用,如condition.await() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队里中的一个线程 | 支持 | 支持 |
唤醒等待队里中的全部线程 | 支持 | 支持 |
1、实例演示
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。
Condition的使用方式比较简单,需要注意在调用方法前获取锁,使用方式如下
public class ConditionTest {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSingal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
}
如示例所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回之前已经获取了锁。Condition定义的部分方法及描述如下
方法名称 | 描述 |
---|---|
void await() throws InterruptedException | 当前线程进入等待状态知道被通知(signal)或中断 |
void awaitUninterruptiby() | 当前线程进入等待状态直到被通知,从方法名称上可以看出该方法对中断不敏感 |
long awaitNanos(long nanosTimeout) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或超时。返回值表示剩余时间,如果在nanosTimeout纳秒之前被唤醒,那么返回值就是nanosTimeout - 实际耗时。如果返回值是0或负数,那么可以认定已经超时了 |
boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或者到某个时间。如果没有到指定时间就被通知,方法返回true,否则,表示到了指定时间,方法返回false |
void signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁 |
void signalAll() | 唤醒所有等待在CONDITION上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁 |
获取一个Condition必须通过Lock的newCondition方法。下面通过一个有界队列的示例来深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”,代码如下
public class ConditionTest1 {
ReentrantLock lock = new ReentrantLock(false);
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
int[] datas = new int[4];
int size = 0;
private void put(int i) {
lock.lock();
try {
while (size == datas.length) {
notFull.await();
}
datas[size++] = i;
System.out.println("存入" + i);
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private int take() {
lock.lock();
try {
while (size == 0) {
notEmpty.await();
}
int t = datas[--size];
System.out.println("取出" + t);
notFull.signal();
return t;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return -1;
}
public static void main(String[] args) {
ConditionTest1 test = new ConditionTest1();
new Thread(new Runnable() {
@Override
public void run() {
test.put(1);
test.put(2);
test.put(3);
test.put(4);
test.put(5);
}
}).start();
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
while (true) {
Thread.sleep(10);
test.take();
}
}
}).start();
}
}
// 输出如下
存入1
存入2
存入3
存入4
取出4
存入5
取出5
取出3
取出2
取出1
回想之前提到的等待/通知的经典范式,二者是非常类似的。用等待/通知实现代码如下
public class BoundedQueue1<T> {
Object[] items;
Object lock = new Object();
private int addIndex, removeIndex, count;
public BoundedQueue1(int size) {
items = new Object[size];
}
public void add(T t) {
synchronized (lock) {
while (count == items.length) {
System.out.println("已到达最大数量,不能再添加");
try {
lock.wait();
} catch (InterruptedException e) {
}
}
items[addIndex] = t;
System.out.println("放入位置" + addIndex + "成功:" + t);
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
lock.notify();
}
}
public void remove() {
synchronized (lock) {
while (count == 0) {
System.out.println("数量为0,等待添加");
try {
lock.wait();
} catch (InterruptedException e) {
}
}
T x = (T) items[removeIndex];
System.out.println("从位置" + removeIndex + "取出成功:" + x);
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
lock.notify();
}
}
public static void main(String[] args) {
BoundedQueue1<Integer> queue = new BoundedQueue1<>(3);
Thread put = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
queue.add(new Integer(i));
}
}
});
Thread get = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
queue.remove();
}
}
});
put.setDaemon(true);
get.setDaemon(true);
put.start();
get.start();
ThreadState.SleepUtils.second(1);
}
}
// 从运行结果来看,两者实现的功能确实是一样的
放入位置0成功:0
放入位置1成功:1
放入位置2成功:2
已到达最大数量,不能再添加
从位置0取出成功:0
放入位置0成功:3
已到达最大数量,不能再添加
从位置1取出成功:1
放入位置1成功:4
从位置2取出成功:2
从位置0取出成功:3
从位置1取出成功:4
数量为0,等待添加
2、Condition的实现分析
Condition是同步器AbstractQueuedSynchronized的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。
下面将分析Condition的实现,主要包括:等待队列、等待和通知,下面提到的Condition如果不加说明均值的是ConditionObject(一个实现类)。
1、等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。
一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下
如图所示,Condition拥有尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,其对应关系如下
如图所示,Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。
2、等待
调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。代码如下
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。
如果从队列的角度去看,当前线程加入Condition的等待队列,该过程如图
如图所示,同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中。
3、通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。代码如下
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
节点从等待队列移动到同步队列的过程如图
通过调用同步器的enq(Node node)方法,等待队列中的头结点线程安全地移动到同步队列。当节点移到到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。
被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器acquireQueued()方法加入到获取同步状态的竞争中。
成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
八、小结
怎么说呢,在某些特定情况下(追求极致性能 或 复杂的线程场景下),Lock接口是个不错的选择。但如果只是在一般的线程应用中,个人感觉synchronized是可以满足大部分场景的。
总访问次数: 230次, 一般般帅 创建于 2018-10-04, 最后更新于 2020-06-25
欢迎关注微信公众号,第一时间掌握最新动态!