Java多線程第十篇--通過CountDownLatch再探AbstractQueuedSynchronizer的共享模式
highlight: a11y-dark
我正在參與掘金技術社區創作者簽約計劃招募活動,點擊鏈接報名投稿。
上一篇中,我們一起學習了ReentrantLock的獨佔式公平上鎖的過程,從中我們深入瞭解了AbstractQueuedSynchronizer(AQS) 同步隊列的源碼以及Condition等待隊列的實現機制。
這一篇中,我們再從另一個實現類CountDownLatch再來看看AQS共享模式的源碼分析。
先來看看CountDownLatch。
CountDownLatch是啥?怎麼用?如何工作的?
關於CountDownLatch的名字,網上眾説紛紜:倒計時器,倒數的門閂,同步計數器等等。它究竟是什麼呢?
場景示例
下面直接上demo,以及場景來説明CountDownLatch的用法和工作原理: ```java public class CountDownLatchDemo extends Thread {
public CountDownLatchDemo(String name) {
this.setName(name);
}
/**
* 本例是一個模仿三人歡樂鬥地主的遊戲,當三個玩家都準備好的時候,遊戲就開始發牌了
*/
static CountDownLatch countDownLatch = new CountDownLatch(3);
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "進入遊戲房間~");
try {
// 模擬玩家進入房間準備的過程
Random random = new Random();
int time = (random.nextInt(10) + 1);
for (int i = 0; i < time; i++) {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "準備中。。。");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "已進入房間並且準備好了~");
countDownLatch.countDown();
}
public static void main(String[] args) throws InterruptedException {
System.out.println("遊戲開始,等待玩家進入遊戲房間~");
CountDownLatchDemo player1 = new CountDownLatchDemo("玩家1");
CountDownLatchDemo player2 = new CountDownLatchDemo("玩家2");
CountDownLatchDemo player3 = new CountDownLatchDemo("玩家3");
player1.start();
player2.start();
player3.start();
countDownLatch.await();
System.out.println("所有玩家都已進入遊戲房間,開始發牌了~~!!");
}
}
運行效果如下:
java
遊戲開始,等待玩家進入遊戲房間~
玩家1進入遊戲房間~
玩家3進入遊戲房間~
玩家2進入遊戲房間~
玩家3準備中。。。
玩家3已進入房間並且準備好了~
玩家1準備中。。。
玩家2準備中。。。
玩家1準備中。。。
玩家2準備中。。。
玩家1準備中。。。
玩家2準備中。。。
玩家2準備中。。。
玩家2已進入房間並且準備好了~
玩家1準備中。。。
玩家1準備中。。。
玩家1已進入房間並且準備好了~
所有玩家都已進入遊戲房間,開始發牌了~~!!
```
代碼場景描述:這段代碼很簡單,我們平時都玩歡樂鬥地主的手機遊戲吧,示例就模擬了這個場景:平台開了一個房間,然後等待三個玩家進入房間並準備,當三個玩家都準備好了,就開始發牌進入遊戲~我們做如下假設:平台並不知道玩傢什麼時候進入房間並且準備好,這時候平台準備了一個倒數的計數器,初始值為3,當有一個玩家進入房間並點擊準備好了後,計數器則減一,當計數器等於0的時候,則説明房間三人已滿,啟動發牌指令,遊戲開始!
CountDownLatch工作原理
從這個簡單的例子來看,我們的主角CountDownLatch好像一個計數器,當計數器的數值大於0時,所有調用await方法的線程都會等待(當然,例子中是main主線程),當其他線程達成某些條件,或者任務做完了,則會將計數器減一,當計數器的值變為0 的時候,所有調用await的線程都將被喚醒,繼續執行下面的業務。
其實在之前的文章的開頭部分講線程間協作的方法的時候就做過説明,CountDownLatch是可以使得多個線程協作完成一個或者多個任務的一個工具類,用來協調多個線程間的同步,可以起到線程間通信的作用。
CountDownLatch使用場景
CountDownLatch的使用場景真的很多,總結如下: - 當一個線程A需要其他一些線程的執行結果,A線程才能繼續執行的場景 - 比如上面的三人鬥地主的場景;再比如我們在寫Service層一個大型查詢統計的時候,這個業務需要統計好幾張大表,每張表統計都需要很長一段時間,如果按照順序一個個統計,則是時間累計;而當我們使用CountDownLatch,再new幾個線程,每個線程各執行一個查詢,每當一個查詢線程做完,將計數器減一,最後為0的時候,主線程則彙總各個線程的結果即可。 - 可以實現指定幾個線程並控制這些線程並行執行。 注意:這裏是並行,而不是併發。 - 這裏的意思就是你可以指定這個任務幾個線程去共同完成,並且可以控制他們同時進行,從而減少時間的浪費和性能的提升。
CountDownLatch三大方法分析
以上通過例子説了CountDownLatch的工作原理以及如何使用,下面將進入我們的主題,源碼分析:透過CountDownLatch來看AQS共享方式獲取同步資源。
以下便是CountDownLatch的源碼實現了,其實很簡單: 其中,自定義同步器的方法具體實現見下圖: 由此可見,要搞明白CountDownLatch的實現原理,還是要看AQS中是如何通過共享方式同步資源的。
繼續,Sync作為AQS的子類且作為CountDownLatch的內部類,這種實現方式幾乎和重入鎖一樣。從Sync的源碼可以看出,實現的是共享式獲取tryAcquireShared和釋放tryReleaseShared,這就證明了CountDownLatch是共享式鎖的典型實現。
由上面的Demo場景分析,我們只需要分析三大函數具體做了啥就可以搞清楚啦。
遊戲平台定義計數器,構造函數public CountDownLatch(int count)
java
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
這是CountDownLatch的構造函數,我們注意看註釋部分對count的解釋,中文意思大概如下:在線程通過await方法之前,必須調用countDown()方法。簡單理解就是就是想要調用await方法的線程被喚醒,只有調用countdown方法使得count變為0。
我們繼續深入此方法,此方法最終其實是初始化自定義同步器Sync,如下:
java
Sync(int count) {
setState(count);
}
到此我們可以發現,CountDownLatch構造器最後其實就是初始化了AQS同步器並且對state值進行了設置,這個值即為CountDownLatch的同步資源的個數。
主線程調用await方法
這時通過打斷點的方式,我們得到了下面的調用路徑,await()-> sync(AQS).acquireSharedInterruptibly(1)->tryAcquireShared(arg),由此我們先看AQS.acquireSharedInterruptibly方法:
java
//此時傳入的arg==1
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//首先判斷當前線程是否中斷,如果中斷則直接拋出異常
if (Thread.interrupted())
throw new InterruptedException();
/**
調用tryAcquireShared方法,如果返回值小於0,則執行doAcquireSharedInterruptibly方法,
大概意思就是如果嘗試獲取鎖資源成功的話,就會返回,否則將要進行排隊,或許會進入阻塞等環節。
即tryAcquireShared(arg)小於0,代表線程獲取共享鎖失敗
大於0,代表當前線程獲取鎖成功,接下來的線程能夠成功獲取看情況
等於0,則代表當前線程獲取成功,接下來的線程肯定就會失敗了
*/
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
到這裏,我們再結合場景和CountDownLatch分析:
場景1
main線程執行await()方法,此時沒有玩家準備好,即沒有線程執行countDown方法,則方法執行到Sync.tryAcquireShared方法,此時state=3,則tryAcquireShared返回-1,則【if (tryAcquireShared(arg) < 0)】返回true,進入到方法doAcquireSharedInterruptibly(arg),見如下源碼實現以及底下的分析:
java
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//1
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//2
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 1、這時主線程main進來,調用addWaiter方法進行node包裝
- 此處addWaiter方法和之前的講獨佔鎖幾乎一樣,做的事情大概如下:封裝線程節點,判斷隊列是否初始化,沒有的話,先初始化同步隊列,然後通過尾插入的方式,將當前節點插入到同步隊列
- 2、取出當前節點的前置節點,判斷前置節點是否為頭節點,此時隊列裏只有主線程節點,很明顯當前主線程節點的前置節點就是head,所以又會再次tryAcquireShared,如果這時還是沒有玩家線程執行countdown方法,很明顯【if (r >= 0)】不會成立,則進入到分支【if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())】,這一段的實現幾乎和上一篇的一樣,就是將前置節點的ws狀態改為SIGNAL,然後正式進入同步隊列後執行park等待(當然這裏面也有個自旋再次嘗試獲取同步資源的執行,我們假設還是不行)。
到此main線程,就進入等待狀態了。
場景2
main主線程處於park等待狀態後,這時玩家1準備好了執行countDown方法,這時我們看看執行countDown方法後會發生什麼?調用邏輯如下countDown-> sync.releaseShared(1)->AQS.releaseShared(1)->Sync.tryReleaseShared(1),先來看看AQS.releaseShared(1)
java
//AQS.releaseShared(1)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
這時【if (tryReleaseShared(arg))】返回true,執行doReleaseShared(),如果返回false,則直接返回false
再來看看Sync.tryReleaseShared(1)
java
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
上段源碼大概意思如下:進入一個自旋過程,如果取出的state等於0,則説明同步資源此時已經全部被減掉了,直接返回false結束,不做任何處理;如果不等於0,則進行CAS state-1操作,直到成功減1,並且返回減1後的值是否為0,如果為0,則返回true,如果不為0,則返回false。
按照場景2的分析,此時玩家1執行countdown後,最終會在【releaseShared(int arg)】方法返回false,不做任何處理,這樣合情合理。
場景3
玩家1執行後,緊接着玩家2也執行了countDown方法,然後這時玩家3也執行了countDown方法。
在按照上面源碼的分析我們可以知道,當玩家3執行countDown方法後,這時tryReleaseShared(int releases)返回true,則執行AQS.doReleaseShared()方法,我們來看下:
java
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
這段代碼其實很複雜,我們先結合場景把流程走下去,玩家3進入該方法,取出頭節點,且此時頭節點ws==SIGNAL,所以執行【compareAndSetWaitStatus(h, Node.SIGNAL, 0)】將頭節點的狀態CAS方式修改成0,如果失敗的話,將繼續來一遍,成功的話,則執行【unparkSuccessor(h)】,這個方法就是喚醒頭節點的下一個節點。
這時主線程節點將被喚醒,則會進入進入到doAcquireSharedInterruptibly方法的循環體中,如下:
```java
/*
* Acquires in shared interruptible mode.
* @param arg the acquire argument
/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這時主線程節點再次執行tryAcquireShared(arg)的返回結果為1,即【if (r >= 0)】成立,則執行這個分支下的語句,即【setHeadAndPropagate(node, r);p.next = null;failed = false;return;】首先是setHeadAndPropagate(node, r),如下:
java
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
``` 將主線程節點設為頭節點後,如果可能的話,將繼續喚醒後繼的節點,去嘗試加鎖,為什麼繼續喚醒呢,因為是共享鎖嘛,當前的釋放了,那説明後繼等待的又可以繼續獲取鎖了,這邊跟獨佔鎖不一樣,獨佔鎖只能是當前獲取鎖的線程釋放後,才能喚醒下一個,而共享鎖是可以繼續後繼節點來嘗試獲取資源。而顯然,按照我們當前場景,主線程從park狀態出來,則程序就繼續往下執行了,到此執行結束。
以上便是CountDownLatch的構造器/await/countDown的流程分析了,我們來總結下: - CountDownLatch三大函數:構造器對計數器初始值的設定;await方法能讓線程進入等待狀態,繼續執行的條件是計數器變為0;countdown方法則是將計數器進行減1操作,當減到為0就會喚醒等待的線程。 - CountDownLatch內部自定義了Sync的同步器,實現了共享式的tryAcquireShared和tryReleaseShared兩個方法,分別用於獲取共享鎖和釋放共享鎖。 - tryAcquireShared:如果state為0,則返回1,即當計數器變為0的時候,全部線程都可以嘗試得到共享鎖,而當計數器非0的時候,返回-1,則將線程交給AQS同步隊列管理。 - tryReleaseShared:主要是通過自旋的形式進行減1操作,如果減1之後發現計數器變為0了,則會進行釋放鎖並喚醒等待線程的操作。
AQS的共享鎖模式
在上面分析CountDownLatch的時候,關於AQS共享鎖的源碼分析都是一筆帶過,有很多的細節沒摳,很複雜的~因為説多了,反而不利於大家理解CountDownLatch的原理,下面我們就着重來看下AQS的共享鎖模式。
在前面篇章的渲染下,我們就不寫demo來看了,其實如果你搞懂了前篇文章獨佔鎖的上鎖和釋放鎖的流程,再來看共享鎖,我相信會容易很多,因為很多地方都是類似的。
共享鎖VS獨佔鎖
共享鎖和獨佔鎖最大區別就是,獨佔鎖是當前只有一個線程可以持有鎖,從源碼就可以看出AQS有個屬性exclusiveOwnerThread,就是用來存放當前持有鎖的線程對象,這時其他線程來嘗試加鎖,只能進行排隊等待,只有等持有鎖的線程釋放了,才會喚醒隊列中排在第一個的線程進行嘗試加鎖,當然這期間也可以被其他線程嘗試搶佔,但同一時間只能有一個線程能夠成功。
而對於共享鎖,顧名思義,它可以同時被多個線程持有鎖資源,換句話説,如果有一個線程獲取鎖成功了,其他來加鎖的或者在隊列等待的線程都可以嘗試加鎖,而且極有可能會加鎖成功。
(PS:還不懂?再舉個粗俗的例子,上廁所都上過吧:獨佔鎖就像家裏的衞生間,只有一個馬桶,同一時間只能有一個人在裏面方便;而共享鎖就像外面的公共廁所,有好幾個坑,同一時間可以有多個人一起方便。。。)
下面我來搞個表格直接從方法的角度來對比下:(我們只看最簡單的哈~那些什麼中斷的,超時處理的,就留給小夥伴們自己分析了) | 獨佔鎖 | 共享鎖 | | --- | --- | | acquire(int arg) | acquireShared(int arg) | | tryAcquire(int arg) | tryAcquireShared(int arg) | | acquireQueued(final Node node, int arg) | doAcquireShared(int arg) | | release(int arg) | releaseShared(int arg) | | tryRelease(int arg) | tryReleaseShared(int arg) | | unparkSuccessor(Node node) | doReleaseShared() |
貌似他倆的方法差不多都能對應上,也就最後一個名字好像不太一樣,但其實你進入doReleaseShared方法裏面,你就會知道,其實它最終也調用了unparkSuccessor方法,只不過共享鎖比獨佔鎖多了不一樣的地方,我們下面具體分析會説的。下面正文開始:(PS:本篇文字較多,需要加點想象力)
共享鎖的獲取分析
上源碼:代碼1acquireShared(int arg) 代碼2tryAcquireShared(int arg) ```java / * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ //代碼1 //AQS.acquireShared(arg) public final void acquireShared(int arg) { / 這個方法的實現很簡單,如果調用全了,就只調用了兩個方法 一個是嘗試獲取共享鎖,見下面代碼2 的分析 一個是獲取失敗了,進行排隊(也可能自旋再次嘗試獲取鎖成功,或者進入排隊等待的方法) */ if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
//代碼2
//AQS.tryAcquireShared
//是一個留待程序員自己實現的方法,是一個空方法,為什麼我們還要看他呢,因為它的定義很重要,我們看
//它的英文註釋,以及我的大概翻譯。。。
/**
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
* mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return a negative value on failure; zero if acquisition in shared
* mode succeeded but no subsequent shared-mode acquire can
* succeed; and a positive value if acquisition in shared
* mode succeeded and subsequent shared-mode acquires might
* also succeed, in which case a subsequent waiting thread
* must check availability. (Support for three different
* return values enables this method to be used in contexts
* where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
大概意思如下:
該方法嘗試以共享模式獲取鎖資源,實現該方法的自定義同步器需要自己去定義查閲資源是否可用
如果可以,則獲取它,如果不可以,則調用此方法的acquire方法將會讓當前線程排隊
第二個我們看返回值的定義:
返回值小於0:獲取鎖資源失敗
返回值等於0:獲取鎖資源成功,且再有線程來獲取,將會不成功
返回值大於0:獲取鎖資源成功,再有線程來獲取,有可能會成功。
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
以上分析總結來看,也就是説**只要tryAcquireShared返回值大於等於0,就代表獲取共享鎖成功,而小於0,就代表沒有資源可以獲取,就要進入doAcquireShared(arg)方法,想來也合情合理哈~** 下面着重來看doAcquireShared(arg)方法,**代碼3**
java
//代碼3
//AQS.doAcquireShared
private void doAcquireShared(int arg) {
/
對比來看,這個方法的流程幾乎和獨佔式的一樣
首先調用addWaiter包裝當前線程為隊列節點,並且將節點尾插入到隊列,如果隊列還未初始化,則先初始化隊列
要説不同,也就是節點的類型不一樣,一個是獨佔式的,一個是共享式的
/
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
/
再然後也是進入一個自旋的過程,這個過程和獨佔式也是差不多的
首先也是取出當前節點的前置節點,如果前置節點是head,則再次嘗試獲取鎖資源(防止之前持有鎖
的線程釋放鎖了);如果獲取成功了,則調用setHeadAndPropagate(node, r),將當前節點設置為
head,這裏有個不同的是,獨佔是是直接setHead,而這邊是setHeadAndPropagate,見代碼4
如果不成功的話,就會執行下面的shouldParkAfterFailedAcquire(p, node)
&&parkAndCheckInterrupt(),這一段在之前的篇章分析過了,主要就是設置前節點的狀態為
SIGNAL,然後會再次嘗試獲取,如果再次失敗,就會正式入隊,阻塞等待
如果前置節點不是head,則會直接進入shouldParkAfterFailedAcquire(p, node)
&&parkAndCheckInterrupt(),緊接着又會去獲取一次前置節點再來一次過程,防止做上一段循環的
過程中,已經有人釋放鎖了,當前的節點變成了head的後繼節點。。。如果不是,則會正式入隊,阻塞
等待。。。。。是不是很繞。。。(當然這一段的過程和獨佔鎖是一樣的)
*/
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//代碼4 //AQS.setHeadAndPropagate private void setHeadAndPropagate(Node node, int propagate) { / 我們看到,首先直接調用setHead方法,設置為頭節點,不同的是下面一段 / Node h = head; // Record old head for check below setHead(node); / * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. 咋一看,好多條件啊,都是||的關係,即下面的條件只要有一個為true,就會去取後繼節點 且如果後繼節點不為空或者後繼節點是共享節點的話,就會調用doReleaseShared(),這個方法 是用來釋放鎖資源的,我們下面會分析
從上面的分析來看,這個方法裏除了設置head節點外,還加了一個過程,就是當滿足一些條件後,
除了當前的線程會獲得鎖資源並且出隊列後,將繼續釋放鎖資源,以喚醒後繼節點嘗試獲取鎖。
哪些條件呢,而且這些條件的判斷講究了一個順序。。。我們知道||操作有個特點,||之前的為
true,他就會看都不看後面一眼,直接true,我們來看:
1、如果 propagate > 0 成立:propagate是上一層傳過來的,是tryAcquireShared(arg)方法的
返回值,我們上面分析過,tryAcquireShared(arg)返回值大於0,代表當前線程獲取鎖成功,再有
線程來獲取,有可能會成功,即説明鎖資源現在有多餘的了,那這時去通知後繼節點去加鎖,沒毛病。
如果不成立,則不就代表沒有鎖資源可以供嘗試獲取嗎?答案是否定的,你要記住我們分析的是多線
程併發場景,可能就在setHead方法的這段時間裏,已經發生了很多的事情。。。(所以這邊的||操作
大神講究了順序。)
2、下面的判斷其實很有意思,把他合起來看h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0),h為舊的head,看舊的head是否為空,然後設置頭節點的
head重新賦給h,即為新的頭節點,且他們判斷的條件是一致的,他其實就是在看,在設置頭節點的前後,
頭節點是否為空,如果為空則直接調用doReleaseShared,其實意思也就是head都為空了,説明一個問題,
隊列不存在了,沒有人排隊,那是不是意味着可能會獲取到鎖資源啊;如果head節點不為空,則看它的狀態
如果狀態小於0,説明頭節點可以被喚醒/傳播喚醒。。。所以也可以執行doReleaseShared
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
// private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ```
共享鎖的獲取分析總結
- 在線程獲取失敗的情況下,會進入【doAcquireShared(int arg)】方法,取出前置節點:
- 1、如果前置節點是head,則進行CAS嘗試加鎖操作:
- 1.1、如果加鎖失敗,或者沒機會加鎖,則就會將前置節點設置為SIGNAL,然後再阻塞等待。
- 1.2、如果加鎖成功,則設置為頭節點,並且可能會嘗試喚醒後繼節點去嘗試加鎖
- 2、如果前置節點不是head,則先將前置節點設置為SIGNAL,然後再繼續自旋,可能會再次進入到1的流程,也有可能直接將前置節點設置為SIGNAL,然後再阻塞等待。
- 1、如果前置節點是head,則進行CAS嘗試加鎖操作:
共享鎖的釋放分析
直接上源碼,代碼5,代碼6 ```java //代碼5 //AQS.releaseShared public final boolean releaseShared(int arg) { /* 線程調用release之後,會先調用tryReleaseShared(arg),boolean類型返回值,見代碼6分析 即當前線程嘗試釋放鎖資源,成功的話,執行doReleaseShared,釋放鎖的源碼分析的精髓在這裏。。 見代碼7的分析 / if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
//代碼6
/**
這也是個空方法,我們看方法的定義和作用
參數arg,釋放鎖資源的個數,但經常是1,即一個線程持有一個鎖資源
返回值boolean,如果嘗試釋放鎖資源成功的話,則返回true,返回到releaseShared就是去執行
doReleaseShared,進行鎖的釋放
失敗的話,則沒有操作。。。
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
**代碼7**
java
//代碼7
//AQS.doReleaseShared
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
先大體翻譯下:
確保釋放過程的傳播性,即使有其他線程正在進行共享資源的獲取/釋放。
如果以通常的方式進行,且頭節點為SIGNAL狀態,嘗試喚醒解除head的後繼等待節點。
如果不是上面的操作,則把狀態設置為PROPAGATE,以確保在釋放時進行傳播。
另外,我們必須進行循環,以防在執行此操作時添加了新節點。
而且,與unpark後繼器的其他使用不同,我們需要知道CAS重置狀態是否失敗,如果失敗則需要重新檢查
按照上面的意思翻譯下來,乍一看,不懂他在説啥。。。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//1
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//2
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//3
if (h == head) // loop if head changed
break;
}
}
``` 在分析代碼7之前,我們先看下,這個函數貌似在之前加鎖的過程中也調用了,所以這個方法有兩個地方調用,第一個就是releaseShared的時候,第二個就是在嘗試加鎖成功,設置為head節點後,也會去調用該方法。
幾個問題
- 為什麼這兩邊都要調用,為什麼在嘗試加鎖成功後,還去調用釋放鎖的方法?到底是哪個線程調用了這個方法?
- 我們細看這個方法裏的實現,for(;;),又是一個自旋,頭都大了,根據之前的經驗,既然是自旋,那肯定有退出循環的條件,那條件是什麼?
- 按照我們之前分析獨佔鎖釋放的過程理解,獨佔鎖釋放鎖成功,並且成功喚醒後繼線程,就直接退出了,為什麼共享鎖這邊需要進行自旋,它的目的是什麼?也就是説這個方法到底做了哪些事情? 我們帶着這些問題,逐行分析上面的過程。。。其實挺難理解的,如果不對,望海涵。。。
我們在上面的代碼中,我標出了 1 2 3 三個關鍵if分支: - 1、首先進入1的判斷,取出頭節點h,並判斷頭節點狀態 - 如果狀態為SIGNAL,則通過CAS方式將SIGNAL改為0 - 成功的話,就會去執行unparkSuccessor釋放head的後繼等待節點,然後去進入3的判斷 - 如果CAS失敗,則繼續再來一遍CAS。 - 如果取出的ws不為SIGNAL,則進入2(注意:1和2其實是一個互斥的情況,也就是説當前調用此方法的線程取出來的ws值,要麼走1的分支,要麼走到2) - 2、如果ws==0,什麼情況下ws==0,首先在執行上面的【compareAndSetWaitStatus(h, Node.SIGNAL, 0)】後,ws會等於0,在當前線程下顯然不會,那還有什麼情況ws==0,這時我們就會想到,我們每次有新節點的加入時,新節點(其實也是最後一個節點)ws會等於0,且恰恰剛變為head,那麼這時就會將頭節點的狀態CAS改為PROPAGATE,失敗的話,將繼續CAS修改,成功的話,則進入3 - 3、如果之前取出的h和head沒有發生變化,則跳出循環,否則繼續循環,將繼續循環喚醒下一個節點。。。循環往復。。
分析到這,我們來看看上面的問題 - 首先這個方法在兩處調用,一處是在線程嘗試獲取鎖成功,然後設置頭節點後,調用了;還有一處是線程自己去調用releaseShared的時候。假設如下場景,同步隊列中有head(X)->A->B->C->D四個線程排隊等待(X代表當前任意一個線程持有了鎖),假設現在A線程被喚醒並且拿到了共享鎖,那麼A就會被設置為頭節點,如下head(A)->B->C->D,這時A會調用doReleaseShared方法,喚醒B,假設B也獲取到了鎖,則就成為了新的頭節點,當下隊列變為head(B)->C->D,B這時設置頭節點後,他也會去調用doReleaseShared方法(其實在上面兩個過程中,是極有可能同時進行的,記住,我們是多線程併發,多線程併發!!這時有很多人就説明明是A喚醒B的,怎麼可能同時呢?那我這邊再假設假如不是A喚醒的,是上面的X呢?。。)也就是説上面的A線程在執行doReleaseShared循環體的過程中,B線程已經把頭節點設置為自己了,且B也去觸發了doReleaseShared方法,B也進入了doReleaseShared的循環體中了(也就是説A線程還在執行doReleaseShared循環體的過程中,頭節點已經不是之前的頭節點了),所以我們才有上面代碼3的判斷(h==head),而且我們也看到了只有當h==head的時候,才會退出循環,這也就是回答了第二問題。 從中我們也可以看到其實這個過程好像是一個不斷髮生,不斷的去觸發doReleaseShared的過程。為什麼大神代碼這麼寫,我猜測是這樣,大神是想:不斷地通過這種方式,來加快共享鎖獲取/釋放的過程,只要當前執行循環體中head節點發生了變化,其實也就是説有其他線程在釋放鎖,説明鎖資源又有了,那麼我就繼續循環,繼續釋放鎖,繼續喚醒後繼節點來嘗試加鎖(我認為這是大神極致優化的體現)。 其實不止上面的流程會導致這樣不斷的循環,我們再假設,這時有個線程F,本來是持有鎖的,現在任務結束了,他就調用releaseShared方法,同樣也會進入這個循環體。 相信分析到這裏已經回答了上面的幾個問題了。
這時我們再回過頭來,看看這個方法裏的註釋以及翻譯,是不是覺得抓住點什麼了,我把註釋及翻譯再貼一次:
java
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//1
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//2
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//3
if (h == head) // loop if head changed
break;
}
}
翻譯解釋,也當作是doReleaseShared的總結
- 1、確保釋放過程的傳播性,即使有其他線程正在進行共享資源的獲取/釋放。
- 這裏其實是對整個方法的一個描述,他是一個不斷重複進行釋放鎖嘗試獲取鎖的過程。
- 2、如果以通常的方式進行,且頭節點為SIGNAL狀態,嘗試喚醒解除head的後繼等待節點。
- 這邊其實是對第一個if分支的解釋
- 3、如果不是上面的操作,則把狀態設置為PROPAGATE,以確保在釋放時進行傳播。另外,我們必須進行循環,以防在執行此操作時添加了新節點。
- 這兩句話就是對第二個分支的解釋:這邊呼應了我上面的分析,每次有新節點的加入時,新節點(其實也是最後一個節點)ws會等於0,且恰恰剛變為head,再然後其實在【(ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))】的前面,還有一個大前提條件【(h != null && h != tail】,説明此時隊列裏至少兩個節點(一個是head,還有一個又是一個新節點),雖然很極限,但卻是有可能存在這種情況。 這時我們再看:【(ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))】假如ws==0為true,當【compareAndSetWaitStatus(h, 0, Node.PROPAGATE)】為false的時候,代表這個CAS操作失敗了,説明有其他線程將ws的值修改了,什麼情況會有其他線程修改ws的值,肯定又有新節點加入且將他改為了SIGNAL,則這時取反後為true,則會continue,繼續CAS修改,否則進入3,又是一個有可能繼續循環,繼續喚醒的過程。。。。從上面的分析來看,這個條件很苛刻,反正我是想不到這樣的場景。。。大神就是大神,牛逼plus!!真的是把各種細節和場景都想到了(我也是想了好久,翻閲了無數資料總結的,如有不對,望海涵。。)
- 4、而且,與unpark後繼器的其他使用不同,我們需要知道CAS重置狀態是否失敗,如果失敗則需要重新檢查
- 這邊是對第一個分支下,CAS將SIGNAL改為0失敗的解釋,失敗的話,則continue,繼續CAS嘗試修改。修改成功的話,則喚醒h的後繼節點。
到這裏其實releaseShared還未講完,繼續:如果節點被喚醒,則被喚醒的節點,將繼續進入doAcquireShared的循環體,繼續CAS嘗試獲取鎖,而這時一般情況下都是可以獲取到鎖資源的,則會設置為head,同時又將繼續執行doReleaseShared,再來循環。。。(腦袋都快被循環壞了)
其實這時再回過頭看AQS.setHeadAndPropagate的源碼,裏面説到了在設置好頭節點後,滿足一定的條件後,將調用doReleaseShared,其實這個條件無非就是大神優化到極致的體現了。
java
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
共享鎖的獲取/釋放總結
共享鎖的實現其實大部分和獨佔鎖類似,但也有不同,大致如下: - 第一個區別當然也是最大的區別就是,共享鎖的鎖資源可以被多個線程同時佔有(同時上廁所方便~),而獨佔鎖一個時間只能是一個線程持有鎖。 - 其次,由於共享鎖資源可以被多個線程同時佔有,所以在線程自旋嘗試獲取到鎖的同時,會去喚醒後繼節點也來嘗試獲取鎖。 - 再者,共享鎖的獲取/釋放過程,是一個一旦發生了,就有可能會不停執行、循環往復的過程,就如狀態PROPAGATE(傳播)一樣,這個過程具備傳播性(而且是病毒性的。。哈哈~),只有當頭節點沒發生變化才會停止。
到此,AQS的共享模式結束。以上只是我個人的看法,當然對於一些特殊細節和場景可能小夥伴們看法不一致,希望大家提出哈~
我正在參與掘金技術社區創作者簽約計劃招募活動,點擊鏈接報名投稿。