目录
本文概览:介绍了await执行过程 和 signal执行过程。
1 引入
以wait/notify之于synchronized的作用(synchronized只是保证多个线程互斥的访问这个实例对象,但是如果需要自定义这些线程顺序,那么此时引入了wait/notify) 来理解condition之于AQS同步器作用:实现控制线程占用同步器AQS的顺序。
2 ConditionObject的数据成员
定义了一个condition队列
1 2 3 4 5 |
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; |
3 ConditionObject的接口
condition接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// 1. 如果线程中断执行await时,会抛出这个异常 void await() throws InterruptedException; // 2. 不对中断状态处理,即无视线程中断 void awaitUninterruptibly(); // 3.等待某个时间,没有等待single,就将当前的线程节点放到条件对列中 boolean await(long time, TimeUnit unit) throws InterruptedException; long awaitNanos(long nanosTimeout) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; // 4.将对列的firstWaiter节点转移到同步器的FIFO对列中 void signal(); // 5.将对列的firstWaiter所有节点转移到同步器的FIFO对列中 void signalAll(); |
3.1 await
1、awati作用
(1)释放当前线程的同步器,把线程节点加入到条件队列中进行阻塞;
(2)触发release操作,AQS 同步队列中Head节点执行acquire操作。
2、执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
public final void await() throws InterruptedException { // 1.如果线程被中断,则不能执行await if (Thread.interrupted()) throw new InterruptedException(); // 2.在对列末尾添加新节点 Node node = addConditionWaiter(); // 3.在await操作之后,就需要释放同步器,让其它线程去获取。 // 为什么使用fullRelease,而不是直接使用release? // 因为full的含义在于当可重入次数多于1次,也一次性释放。 int savedState = fullyRelease(node); int interruptMode = 0; // 判断是否在AQS的FIFO队列中(pre为null或者node状态为CONDITION) while (!isOnSyncQueue(node)) { // 4.执行park LockSupport.park(this); // 5.当此线程执行unpark之后,将此node放到同步对列中 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 6.这里调用acquireQueued作用是什么? // 在acquire函数当线程需要阻塞时需要包括两大步:在FIFO中添加节点和执行acuqireQueued。 // 这里使用的acquireQueued的作用是把这里来当成acquire来处理: // (1)执行single之后,就把线程节点从条件对列移到了FIFO对列 // 这一步相当于上述acquire中第一步:在FIFO对列中新建一个线程节点; // (2)然后我们再执行acquireQueued. if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } |
3.2 awaitUninterruptibly()
就是线程是中断的状态,也可以继续执行。查看awaitUniterruptible()代码比await()代码少了如下逻辑:
1 2 |
if (Thread.interrupted()) throw new InterruptedException(); |
3.3 single
single把条件队列中firstWaiter节点转移到AQS同步队列中,触发await阻塞的线程,从lockSupport.park的位置开始执行acquire。
- singel操作通过transferForSignal函数来处理firstWaiter节点
- transferForSignal的作用就是:将条件对列的节点(每一次都是取FiristWaiter节点),放置到同步器的FIFO对列中。
1、transferForSignal的代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
/** Transfers a node from a condition queue onto sync queue. **/ final boolean transferForSignal(Node node) { // 1.设置node状态 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 2.将节点转移到同步队列 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } |
问题1 为什么要执行 LockSupport.unpark(node.thread)?
为了执行从await操作如下位置开始执行
3.4 await(long time, TimeUnit unit)
1、作用
等待某个时间,超过这个时间,线程结点自动进入到同步器的FIFO对列,不再需要等待singal来执行条件对列节点放入到同步器的FIFO对列中。
2、实现
通过LockSupport.parkNanos(this, nanosTimeout);这方法来实现等待时间。如果超过时间阈值,执行transferAfterCancelledWait函数来讲此线程节点加入到同步器的FIFO对列中,不再需要等待singal操作。
3、其他类似的Condition接口
1 2 3 4 5 |
// 调用 LockSupport.parkUntil(this, abstime); boolean awaitUntil(Date deadline) throws InterruptedException; // 调用LockSupport.parkNanos(this, nanosTimeout); public final boolean await(long time, TimeUnit unit) |
3.4 signalAll()
single就是将条件对列的firstWaiter节点放入到同步器的FIFO对列中。
singleAll就是将条件对列所有线程节点都放置到FIFO对列中。
4 相关问题
4.1 解析Condition的await/single和同步器AQS的acquire/release
1、await实现功能总结
第一步: 当前线程放弃同步器
第二步 :通过LockSupport#park阻塞线程;将线程节点放置到条件队列中。
第三步 :并通过fullRelease来触发同步队列中Head节点。
2、acquire
执行acquire分两种情况:
(1)情况1 可以获取同步器,此时直接运行线程
(2)情况2 没有获取同步器,需要阻塞线程
通过LockSupport#park阻塞线程;将线程节点放置到同步器的FIFO队列中。
3、signal()功能:
就是将条件队列的线程节点放置到同步器的FIFO对列中。执行LockSupport.unpark操作触发await阻塞的操作。
4、release()功能
对于FIFO对列上的节点线程,最终需要执行realse,通过unpark来运行这个节点的线程
4.2 对于wait线程,下次通过notify通知时,如何保证从wait位置执行程序?
通过LockSupport.park和LockSupport.unpark。
1、执行await操作
执行LockSupport.park方法,然后将线程节点加入条件对列
2、执行single操作
将条件对列的firstWaiter节点转移到同步器的FIFO队列。将来通过Lock.unlock()方法来执行这个线程的unpark,从而让这个线程继续执行。
4.3 Conditon队列和AQS中的FIFO对列比较
1. 两个队列的区别
condition对列为单向对列
AbstractQueueSynchronizer的FIFO对列是双向对列
2 一个node节点是否可以即在Condition对列,又在AbstractQueueSynchronizer对列中
可以。可以参考代码来判断这个条件对列的节点是否在同步对列:
1 2 3 |
final boolean isOnSyncQueue(Node node) { .... } |
(1)什么时候放将条件对列中节点放到同步对列
在执行await操作,调用checkInterruptWhileWaiting函数。在checkInterruptWhileWaiting函数中将条件对列的节点放置到同步对列,如下代码
1 2 3 4 5 6 7 |
final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } } |
(2)什么时候将条件对列中在同步对列的节点删除
在执行addConditionWaiter() 函数时,会执行 unlinkCancelledWaiters();代码如下,清空在条件对列中节点状态不是Node.CONDITION的节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } |
(全文完)