成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

聊聊 CountDownLatch 閉鎖源碼分析

開發 前端
閉鎖的作用相當于一扇門∶ 在閉鎖到達結束狀態之前,這扇門一直是關閉的,并且沒有任何線程能通過,當到達結束狀態時,這扇門會打開并允許所有的線程通過。

[[422507]]

本文轉載自微信公眾號「運維開發故事」,作者老鄭。轉載本文請聯系運維開發故事公眾號。

功能簡介

閉鎖是一種同步工具類,可以延遲線程的進度直到其到達終止狀態【CPJ 3.4.2】。閉鎖的作用相當于一扇門∶ 在閉鎖到達結束狀態之前,這扇門一直是關閉的,并且沒有任何線程能通過,當到達結束狀態時,這扇門會打開并允許所有的線程通過。當閉鎖到達結束狀態后,將不會再改變狀態,因此這扇門將永遠保持打開狀態。閉鎖可以用來確保某些活動直到其他活動都完成后才繼續執行,例如∶

  • 確保某個計算在其需要的所有資源都被初始化之后才繼續執行。二元閉鎖(包括兩個狀態)可以用來表示"資源R已經被初始化",而所有需要 R 的操作都必須先在這個閉鎖上等待。
  • 確保某個服務在其依賴的所有其他服務都已經啟動之后才啟動。每個服務都有一個相關的二元閉鎖。當啟動服務S 時,將首先在S依賴的其他服務的閉鎖上等待,在所有依賴的服務都啟動后會釋放閉鎖S,這樣其他依賴 S 的服務才能繼續執行。
  • 等待直到某個操作的所有參與者(例如,在多玩家游戲中的所有玩家)都就緒再繼續執行。在這種情況中,當所有玩家都準備就緒時,閉鎖將到達結束狀態。

CountDownLatch.jpg

CountDownLatch是一種靈活的閉鎖實現,可以在上述各種情況中使用,它可以使一個或多個線程等待一組事件發生。閉鎖狀態包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown方法遞減計數器,表示有一個事件已經發生了,而 await方法等待計數器達到零,這表示所有需要等待的事件都已經發生。如果計數器的值非零,那么 await 會一直阻塞直到計數器為零,或者等待中的線程中斷,或者等待超時。

使用案例

TestHarness 中給出了閉鎖的兩種常見用法。TestHarness 創建一定數量的線程,利用它們并發地執行指定的任務。它使用兩個閉鎖,分別表示"起始門(Starting Gate)"和"結束門(Ending Gate)"。起始門計數器的初始值為1,而結束門計數器的初始值為工作線程的數量。每個工作線程首先要做的就是在啟動門上等待,從而確保所有線程都就緒后才開始執行。而每個線程要做的最后一件事情是將調用結束門的 countDown 方法減1,這能使主線程高效地等待直到所有工作線程都執行完成,因此可以統計所消耗的時間。

  1. public class TestHarness { 
  2.  
  3.     public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { 
  4.         final CountDownLatch startGate = new CountDownLatch(1); 
  5.         final CountDownLatch endGate = new CountDownLatch(nThreads); 
  6.  
  7.         for (int i = 0; i < nThreads; i++) { 
  8.             Thread t = new Thread(() -> { 
  9.                 try { 
  10.                     startGate.await(); 
  11.                     try { 
  12.                         task.run(); 
  13.                     } finally { 
  14.                         endGate.countDown(); 
  15.                     } 
  16.  
  17.                 } catch (InterruptedException ignored) { 
  18.  
  19.                 } 
  20.             }); 
  21.             t.start(); 
  22.         } 
  23.  
  24.         long start = System.nanoTime(); 
  25.         startGate.countDown(); 
  26.         endGate.await(); 
  27.         long end = System.nanoTime(); 
  28.         return end - start; 
  29.     } 
  30.  
  31.     public static void main(String[] args) throws InterruptedException { 
  32.         TestHarness testHarness = new TestHarness(); 
  33.         AtomicInteger num = new AtomicInteger(0); 
  34.         long time = testHarness.timeTasks(10, () -> System.out.println(num.incrementAndGet())); 
  35.         System.out.println("cost time: " + time + "ms"); 
  36.     } 
  37.  
  38. //輸出結果 
  39. 10 
  40. cost time: 2960900ms 

為什么要在 TestHarness 中使用閉鎖,而不是在線程創建后就立即啟動? 或許,我們希望測試 n 個線程并發執行某個任務時需要的時間。如果在創建線程后立即啟動它們,那么先啟動的線程將"領先"后啟動的線程,并且活躍線程數量會隨著時間的推移而增加或減少,競爭程度也在不斷發生變化。啟動門將使得主線程能夠實時釋放所有工作線程,而結束門則使主線程能夠等待最后一個線程執行完成,而不是順序地等待每個線程執行完成。

使用總結

CountDownLatch 是一次性的,計算器的值只能在構造方法中初始化一次,之后沒有任何機制再次對其設置值,當CountDownLatch 使用完畢后,它不能再次被使用。

源碼分析

代碼分析

CountDownLatch 在底層還是采用 AbstractQueuedSynchronizer 實現。

  1. CountDownLatch startGate = **new **CountDownLatch(1); 

我們先看它的構造方法, 創建了一個 sync 對象。

  1. public CountDownLatch(int count) { 
  2.     if (count < 0) throw new IllegalArgumentException("count < 0"); 
  3.     this.sync = new Sync(count); 

Sync 是 AbstractQueuedSynchronizer 的一個實現, 按照字面意思我們可以猜到它是公平方式實現。

  1. private static final class Sync extends AbstractQueuedSynchronizer { 
  2.     private static final long serialVersionUID = 4982264981922014374L; 
  3.  
  4.     // 構造方法 
  5.     Sync(int count) { 
  6.         setState(count); 
  7.     } 
  8.  
  9.     // 獲取資源數 
  10.     int getCount() { 
  11.         return getState(); 
  12.     } 
  13.  
  14.     // 獲取鎖 
  15.     protected int tryAcquireShared(int acquires) { 
  16.         return (getState() == 0) ? 1 : -1; 
  17.     } 
  18.  
  19.     // 釋放鎖 
  20.     protected boolean tryReleaseShared(int releases) { 
  21.         // Decrement count; signal when transition to zero 
  22.         for (;;) { 
  23.             int c = getState(); 
  24.             if (c == 0) 
  25.                 return false
  26.             int nextc = c-1; 
  27.             // CAS 解鎖 
  28.             if (compareAndSetState(c, nextc)) 
  29.                 return nextc == 0; 
  30.         } 
  31.     } 

在 await 方法中如果存在計算值, 那么當前線程將進入 AQS 隊列生成 Node 節點, 線程進入阻塞狀態。

  1. public void await() throws InterruptedException { 
  2.     sync.acquireSharedInterruptibly(1); 

其實主要是獲取共享鎖。

  1. public final void acquireSharedInterruptibly(int arg) 
  2.     throws InterruptedException { 
  3.     if (Thread.interrupted()) 
  4.         throw new InterruptedException(); 
  5.     if (tryAcquireShared(arg) < 0) 
  6.         doAcquireSharedInterruptibly(arg); 

CountDownLatch.Sync 實現了 tryAcquireShared 方法 ,如果 getState() == 0 返回 1 , 否則返回 -1. 也就是說創建 CountDownLatch 實例后再執行 await 方法將繼續調用 doAcquireSharedInterruptibly(arg);

  1. // 是否可獲取共享鎖 
  2. protected int tryAcquireShared(int acquires) { 
  3.     return (getState() == 0) ? 1 : -1; 
  4.  
  5.  
  6. // 嘗試獲取鎖, 或者入隊 
  7. private void doAcquireSharedInterruptibly(int arg) 
  8.     throws InterruptedException { 
  9.     final Node node = addWaiter(Node.SHARED); 
  10.     boolean failed = true
  11.     try { 
  12.         for (;;) { 
  13.             final Node p = node.predecessor(); 
  14.             if (p == head) { 
  15.                 int r = tryAcquireShared(arg); 
  16.                 if (r >= 0) { 
  17.                     setHeadAndPropagate(node, r); 
  18.                     p.next = null; // help GC 
  19.                     failed = false
  20.                     return
  21.                 } 
  22.             } 
  23.             if (shouldParkAfterFailedAcquire(p, node) && 
  24.                 parkAndCheckInterrupt()) 
  25.                 throw new InterruptedException(); 
  26.         } 
  27.     } finally { 
  28.         if (failed) 
  29.             cancelAcquire(node); 
  30.     } 

在 countDown 方法如果存在等待的線程, 將對其進行喚醒. 或者減少 CountDownLatch 資源數。

  1. public void countDown() { 
  2.     sync.releaseShared(1); 

通過 releaseShared 對共享鎖進行解鎖。 

  1. public final boolean releaseShared(int arg) { 
  2.     if (tryReleaseShared(arg)) { 
  3.         doReleaseShared(); 
  4.         return true
  5.     } 
  6.     return false

最終會調用 doReleaseShared 喚醒 AQS 中的頭節點。

  1. private void doReleaseShared() { 
  2.     /* 
  3.          * Ensure that a release propagates, even if there are other 
  4.          * in-progress acquires/releases.  This proceeds in the usual 
  5.          * way of trying to unparkSuccessor of head if it needs 
  6.          * signal. But if it does not, status is set to PROPAGATE to 
  7.          * ensure that upon release, propagation continues. 
  8.          * Additionally, we must loop in case a new node is added 
  9.          * while we are doing this. Also, unlike other uses of 
  10.          * unparkSuccessor, we need to know if CAS to reset status 
  11.          * fails, if so rechecking. 
  12.          */ 
  13.     for (;;) { 
  14.         Node h = head; 
  15.         if (h != null && h != tail) { 
  16.             int ws = h.waitStatus; 
  17.             if (ws == Node.SIGNAL) { 
  18.                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
  19.                     continue;            // loop to recheck cases 
  20.                 unparkSuccessor(h); 
  21.             } 
  22.             else if (ws == 0 && 
  23.                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
  24.                 continue;                // loop on failed CAS 
  25.         } 
  26.         if (h == head)                   // loop if head changed 
  27.             break; 
  28.     } 

詳細流程如下圖:

源碼流程圖

CountDownLatch 閉鎖源碼分析.png

參考資料

《Java 并發編程實戰》 

https://www.cnblogs.com/Lee_xy_z/p/10470181.html

 

責任編輯:武曉燕 來源: 運維開發故事
相關推薦

2021-09-07 07:53:42

Semaphore 信號量源碼

2021-08-10 07:00:00

Nacos Clien服務分析

2009-05-21 13:25:50

.NETCountDownLa微軟

2020-12-21 07:54:46

CountDownLa用法源碼

2022-09-28 11:34:27

用戶行為數據業務

2022-11-30 08:19:15

內存分配Go逃逸分析

2021-07-07 07:09:49

Redisson分布式鎖源碼

2021-05-28 08:52:45

Hive分析函數

2021-09-28 07:12:09

Linux內核入口

2021-07-07 05:00:17

初始化源碼

2023-09-13 07:02:23

2021-10-11 09:41:20

React位運算技巧前端

2021-12-30 22:50:32

KafkaConsumer 源碼

2019-07-25 12:46:32

Java高并發編程語言

2011-03-15 11:33:18

iptables

2020-05-06 22:07:53

UbuntuLinux操作系統

2014-08-26 11:11:57

AsyncHttpCl源碼分析

2022-06-05 23:31:28

ClionMySQL數據

2022-12-02 11:40:00

數據分析成果

2023-05-04 00:27:40

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 午夜影院网站 | 天天色影视综合 | 久久久久久国模大尺度人体 | 日韩一区二区三区视频 | 亚洲毛片| 天堂素人约啪 | 久久久人成影片一区二区三区 | 伊色综合久久之综合久久 | 色接久久| 国产一级片 | 成人欧美一区二区三区在线观看 | 久草网站 | 黄色在线免费观看 | 国产精品久久久久aaaa九色 | 欧美黄色一区 | 中文字幕 欧美 日韩 | 欧美日韩中文字幕在线 | 国产精品日韩欧美一区二区三区 | 久久国产精品无码网站 | 黄a免费看| 天天操天天干天天曰 | 国产精品永久免费视频 | 国产一级电影在线 | 99精品国产一区二区青青牛奶 | 中文字幕在线网 | 日日夜夜天天 | 偷拍亚洲色图 | 韩国av网站在线观看 | 激情一区二区三区 | av片在线播放 | 草久久 | 久草www | 天天操夜夜骑 | 亚洲 欧美 在线 一区 | 羞羞视频免费在线 | 北条麻妃99精品青青久久主播 | 精品国产乱码久久久久久闺蜜 | 黄网站免费观看 | 欧美日韩在线免费 | 一二区电影| 国产1区2区 |