面試侃集合 | SynchronousQueue公平模式篇
面試官:呦,小伙子來(lái)的挺早啊!
Hydra:那是,不能讓您等太久了啊(別廢話(huà)了快開(kāi)始吧,還趕著去下一場(chǎng)呢)。
面試官:前面兩輪表現(xiàn)還不錯(cuò),那我們今天繼續(xù)說(shuō)說(shuō)隊(duì)列中的SynchronousQueue吧。
Hydra:好的,SynchronousQueue和之前介紹過(guò)的隊(duì)列相比,稍微有一些特別,必須等到隊(duì)列中的元素被消費(fèi)后,才能繼續(xù)向其中添加新的元素,因此它也被稱(chēng)為無(wú)緩沖的等待隊(duì)列。
我還是先寫(xiě)一個(gè)例子吧,創(chuàng)建兩個(gè)線(xiàn)程,生產(chǎn)者線(xiàn)程putThread向SynchronousQueue中放入元素,消費(fèi)者線(xiàn)程takeThread從中取走元素:
- SynchronousQueue<Integer> queue=new SynchronousQueue<>(true);
- Thread putThread=new Thread(()->{
- for (int i = 0; i <= 2; i++) {
- try {
- System.out.println("put thread put:"+i);
- queue.put(i);
- System.out.println("put thread put:"+i+" awake");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- Thread takeThread=new Thread(()->{
- int j=0;
- while(j<2){
- try {
- j=queue.take();
- System.out.println("take from putThread:"+j);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- putThread.start();
- Thread.sleep(1000);
- takeThread.start();
執(zhí)行上面的代碼,查看結(jié)果:
- put thread put:0
- take from putThread:0
- put thread put:0 awake
- put thread put:1
- take from putThread:1
- put thread put:1 awake
- put thread put:2
- take from putThread:2
- put thread put:2 awake
可以看到,生產(chǎn)者線(xiàn)程在執(zhí)行put方法后就被阻塞,直到消費(fèi)者線(xiàn)程執(zhí)行take方法對(duì)隊(duì)列中的元素進(jìn)行了消費(fèi),生產(chǎn)者線(xiàn)程才被喚醒,繼續(xù)向下執(zhí)行。簡(jiǎn)單來(lái)說(shuō)運(yùn)行流程是這樣的:
面試官:就這?應(yīng)用誰(shuí)不會(huì)啊,不講講底層原理就想蒙混過(guò)關(guān)?
Hydra:別急啊,我們先從它的構(gòu)造函數(shù)說(shuō)起,根據(jù)參數(shù)不同,SynchronousQueue分為公平模式和非公平模式,默認(rèn)情況下為非公平模式
- public SynchronousQueue(boolean fair) {
- transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
- }
我們先來(lái)看看公平模式吧,該模式下底層使用的是TransferQueue隊(duì)列,內(nèi)部節(jié)點(diǎn)由QNode構(gòu)成,定義如下:
- volatile QNode next; // next node in queue
- volatile Object item; // CAS'ed to or from null
- volatile Thread waiter; // to control park/unpark
- final boolean isData;
- QNode(Object item, boolean isData) {
- this.item = item;
- this.isData = isData;
- }
item用來(lái)存儲(chǔ)數(shù)據(jù),isData用來(lái)區(qū)分節(jié)點(diǎn)是什么類(lèi)型的線(xiàn)程產(chǎn)生的,true表示是生產(chǎn)者,false表示是消費(fèi)者,是后面用來(lái)進(jìn)行節(jié)點(diǎn)匹配(complementary )的關(guān)鍵。在SynchronousQueue中匹配是一個(gè)非常重要的概念,例如一個(gè)線(xiàn)程先執(zhí)行put產(chǎn)生了一個(gè)節(jié)點(diǎn)放入隊(duì)列,另一個(gè)線(xiàn)程再執(zhí)行take產(chǎn)生了一個(gè)節(jié)點(diǎn),這兩個(gè)不同類(lèi)型的節(jié)點(diǎn)就可以匹配成功。
面試官:可是我看很多資料里說(shuō)SynchronousQueue是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,這點(diǎn)你是怎么理解的?
Hydra:通過(guò)上面節(jié)點(diǎn)中封裝的屬性,可以看出SynchronousQueue的隊(duì)列中封裝的節(jié)點(diǎn)更多針對(duì)的不是數(shù)據(jù),而是要執(zhí)行的操作,個(gè)人猜測(cè)這個(gè)說(shuō)法的出發(fā)點(diǎn)就是隊(duì)列中存儲(chǔ)的節(jié)點(diǎn)更多偏向于操作這一屬性。
面試官:好吧,接著往下說(shuō)隊(duì)列的結(jié)構(gòu)吧。
Hydra:TransferQueue中主要定義的屬性有下面這些:
- transient volatile QNode head;
- transient volatile QNode tail;
- transient volatile QNode cleanMe;
- TransferQueue() {
- QNode h = new QNode(null, false); // initialize to dummy node.
- head = h;
- tail = h;
- }
比較重要的有頭節(jié)點(diǎn)head、尾節(jié)點(diǎn)tail、以及用于標(biāo)記下一個(gè)要?jiǎng)h除的節(jié)點(diǎn)的cleanMe節(jié)點(diǎn)。在構(gòu)造函數(shù)初始化中創(chuàng)建了一個(gè)節(jié)點(diǎn),注釋中將它稱(chēng)為dummy node,也就是偽造的節(jié)點(diǎn),它的作用類(lèi)似于AQS中的頭節(jié)點(diǎn)的作用,實(shí)際操作的節(jié)點(diǎn)是它的下一個(gè)節(jié)點(diǎn)。
要說(shuō)SynchronousQueue,真是一個(gè)神奇的隊(duì)列,不管你調(diào)用的是put和offer,還是take和poll,它都一概交給核心的transfer方法去處理,只不過(guò)參數(shù)不同。今天我們拋棄源碼,通過(guò)畫(huà)圖對(duì)它進(jìn)行分析,首先看一下方法的定義:
- E transfer(E e, boolean timed, long nanos);
面試官:呦呵,就一個(gè)方法?我倒要看看它是怎么區(qū)分實(shí)現(xiàn)的入隊(duì)和出隊(duì)操作…
Hydra:在方法的參數(shù)中,timed和nanos用于標(biāo)識(shí)調(diào)用transfer的方法是否是能夠超時(shí)退出的,而e是否為空則可以說(shuō)明是生產(chǎn)者還是消費(fèi)者調(diào)用的此方法。如果e不為null,是生產(chǎn)者調(diào)用,如果e為null則是消費(fèi)者調(diào)用。方法的整體邏輯可以分為下面幾步:
1、若隊(duì)列為空,或隊(duì)列中的尾節(jié)點(diǎn)類(lèi)型和自己的類(lèi)型相同,那么準(zhǔn)備封裝一個(gè)新的QNode添加到隊(duì)列中。在添加新節(jié)點(diǎn)到隊(duì)尾的過(guò)程中,并沒(méi)有使用synchronized或ReentrantLock,而是通過(guò)CAS來(lái)保證線(xiàn)程之間的同步。
在添加新的QNode到隊(duì)尾前,會(huì)首先判斷之前取到的尾節(jié)點(diǎn)是否發(fā)生過(guò)改變,如果有改變的話(huà)那么放棄修改,進(jìn)行自旋,在下一次循環(huán)中再次判斷。當(dāng)檢查隊(duì)尾節(jié)點(diǎn)沒(méi)有發(fā)生改變后,構(gòu)建新的節(jié)點(diǎn)QNode,并將它添加到隊(duì)尾。
2、當(dāng)新節(jié)點(diǎn)被添加到隊(duì)尾后,會(huì)調(diào)用awaitFulfill方法,會(huì)根據(jù)傳遞的參數(shù)讓線(xiàn)程進(jìn)行自旋或直接掛起。方法的定義如下,參數(shù)中的timed為true時(shí),表示這是一個(gè)有等待超時(shí)的方法。
- Object awaitFulfill(QNode s, E e, boolean timed, long nanos);
在awaitFulfill方法中會(huì)進(jìn)行判斷,如果新節(jié)點(diǎn)是head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),考慮到可能很快它就會(huì)完成匹配后出隊(duì),先不將它掛起,進(jìn)行一定次數(shù)的自旋,超過(guò)自旋次數(shù)的上限后再進(jìn)行掛起。如果不是head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),避免自旋造成的資源浪費(fèi),則直接調(diào)用park或parkNanos掛起線(xiàn)程。
3、當(dāng)掛起的線(xiàn)程被中斷或到達(dá)超時(shí)時(shí)間,那么需要將節(jié)點(diǎn)從隊(duì)列中進(jìn)行移除,這時(shí)會(huì)執(zhí)行clean()方法。如果要被刪除的節(jié)點(diǎn)不是鏈表中的尾節(jié)點(diǎn),那么比較簡(jiǎn)單,直接使用CAS替換前一個(gè)節(jié)點(diǎn)的next指針。
如果要?jiǎng)h除的節(jié)點(diǎn)是鏈表中的尾節(jié)點(diǎn),就會(huì)有點(diǎn)復(fù)雜了,因?yàn)槎嗑€(xiàn)程環(huán)境下可能正好有其他線(xiàn)程正在向尾節(jié)點(diǎn)后添加新的節(jié)點(diǎn),這時(shí)如果直接刪除尾節(jié)點(diǎn)的話(huà),會(huì)造成后面節(jié)點(diǎn)的丟失。
這時(shí)候就會(huì)用到TransferQueue中定義的cleanMe標(biāo)記節(jié)點(diǎn)了,cleanMe的作用就是當(dāng)要被移除的節(jié)點(diǎn)是隊(duì)尾節(jié)點(diǎn)時(shí),用它來(lái)標(biāo)記隊(duì)尾節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)。具體在執(zhí)行過(guò)程中,又會(huì)分為兩種情況:
- cleanMe節(jié)點(diǎn)為null,說(shuō)明隊(duì)列在之前沒(méi)有標(biāo)記需要?jiǎng)h除的節(jié)點(diǎn)。這時(shí)會(huì)使用cleanMe來(lái)標(biāo)識(shí)該節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),標(biāo)記完成后退出clean方法,當(dāng)下一次執(zhí)行clean方法時(shí)才會(huì)刪除cleanMe的下一個(gè)節(jié)點(diǎn)。
- cleanMe節(jié)點(diǎn)不為null,那么說(shuō)明之前已經(jīng)標(biāo)記過(guò)需要?jiǎng)h除的節(jié)點(diǎn)。這時(shí)刪除cleanMe的下一個(gè)節(jié)點(diǎn),并清除當(dāng)前cleanMe標(biāo)記,并再將當(dāng)前節(jié)點(diǎn)未修改前的前驅(qū)節(jié)點(diǎn)標(biāo)記為cleanMe。注意,當(dāng)前要被刪除的節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不會(huì)發(fā)生改變,即使這個(gè)前驅(qū)節(jié)點(diǎn)已經(jīng)在邏輯上從隊(duì)列中刪除掉了。
執(zhí)行完成clean方法后,transfer方法會(huì)直接返回null,說(shuō)明入隊(duì)操作失敗。
面試官:講了這么多,入隊(duì)的還都是一個(gè)類(lèi)型的節(jié)點(diǎn)吧?
Hydra:是的,TransferQueue隊(duì)列中,只會(huì)存在一個(gè)類(lèi)型的節(jié)點(diǎn),如果有另一個(gè)類(lèi)型的節(jié)點(diǎn)過(guò)來(lái),那么就會(huì)執(zhí)行出隊(duì)的操作了。
面試官:好吧,那你接著再說(shuō)說(shuō)出隊(duì)方法吧。
Hydra:相對(duì)入隊(duì)來(lái)說(shuō),出隊(duì)的邏輯就比較簡(jiǎn)單了。因?yàn)楝F(xiàn)在使用的是公平模式,所以當(dāng)隊(duì)列不為空,且隊(duì)列的head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)與當(dāng)前節(jié)點(diǎn)匹配成功時(shí),進(jìn)行出隊(duì)操作,喚醒head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),進(jìn)行數(shù)據(jù)的傳遞。
根據(jù)隊(duì)列中節(jié)點(diǎn)類(lèi)型的不同,可以分為兩種情況進(jìn)行分析:
1、如果head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)是put類(lèi)型,當(dāng)前新節(jié)點(diǎn)是take類(lèi)型。take線(xiàn)程取出put節(jié)點(diǎn)的item的值,并將其item變?yōu)閚ull,然后推進(jìn)頭節(jié)點(diǎn),喚醒被掛起的put線(xiàn)程,take線(xiàn)程返回item的值,完成數(shù)據(jù)的傳遞過(guò)程。
head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)被喚醒后,會(huì)推進(jìn)head節(jié)點(diǎn),雖然前面說(shuō)過(guò)隊(duì)列的head節(jié)點(diǎn)是一個(gè)dummy節(jié)點(diǎn),并不存儲(chǔ)數(shù)據(jù),理論上應(yīng)該將第二個(gè)節(jié)點(diǎn)直接移出隊(duì)列,但是源碼中還是將head節(jié)點(diǎn)出隊(duì),將原來(lái)的第二個(gè)節(jié)點(diǎn)變成了新的head節(jié)點(diǎn)。
2、同理,如果head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)是take類(lèi)型,當(dāng)前新節(jié)點(diǎn)是put類(lèi)型。put線(xiàn)程會(huì)將take節(jié)點(diǎn)的item設(shè)為自己的數(shù)據(jù)值,然后推進(jìn)頭節(jié)點(diǎn),并喚醒掛起的take線(xiàn)程,喚醒的take線(xiàn)程最終返回從put線(xiàn)程獲得的item的值。
此外,在take線(xiàn)程喚醒后,會(huì)將自己QNode的item指針指向自己,并將waiter中保存的線(xiàn)程置為null,方便之后被gc回收。
面試官:也就是說(shuō),在代碼中不一定非要生產(chǎn)者先去生產(chǎn)產(chǎn)品,也可以由消費(fèi)者先到達(dá)后進(jìn)行阻塞等待?
Hydra:是的,兩種線(xiàn)程都可以先進(jìn)入隊(duì)列。
面試官:好了,公平模式下我是明白了,我去喝口水,給你十分鐘時(shí)間,回來(lái)我們聊聊非公平模式的實(shí)現(xiàn)吧。
本文轉(zhuǎn)載自微信公眾號(hào)「碼農(nóng)參上」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系碼農(nóng)參上公眾號(hào)。