成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

徹底搞懂Java線程池的工作原理

開(kāi)發(fā) 后端
本篇文章將深入分析Java中線程池的工作原理。這也是為什么把線程池放到最后來(lái)寫(xiě)的原因。本篇文章權(quán)當(dāng)是一個(gè)并發(fā)系列的綜合練習(xí),剛好鞏固實(shí)踐一下前面知識(shí)點(diǎn)的運(yùn)用。

[[411692]]

 前言

多線程并發(fā)是Java語(yǔ)言中非常重要的一塊內(nèi)容,同時(shí),也是Java基礎(chǔ)的一個(gè)難點(diǎn)。說(shuō)它重要是因?yàn)槎嗑€程是日常開(kāi)發(fā)中頻繁用到的知識(shí),說(shuō)它難是因?yàn)槎嗑€程并發(fā)涉及到的知識(shí)點(diǎn)非常之多,想要完全掌握J(rèn)ava的并發(fā)相關(guān)知識(shí)并非易事。也正因此,Java并發(fā)成了Java面試中最高頻的知識(shí)點(diǎn)之一。

本篇文章將深入分析Java中線程池的工作原理。個(gè)人認(rèn)為線程池是Java并發(fā)中比較難已理解的一塊知識(shí),因?yàn)榫€程池內(nèi)部實(shí)現(xiàn)使用到了大量的像ReentrantLock、AQS、AtomicInteger、CAS以及“生產(chǎn)者-消費(fèi)者”模型等并發(fā)相關(guān)的知識(shí),基本上涵蓋了并發(fā)系列前幾篇文章的大部分知識(shí)點(diǎn)。這也是為什么把線程池放到最后來(lái)寫(xiě)的原因。本篇文章權(quán)當(dāng)是一個(gè)并發(fā)系列的綜合練習(xí),剛好鞏固實(shí)踐一下前面知識(shí)點(diǎn)的運(yùn)用。

線程池基礎(chǔ)知識(shí)

在Java語(yǔ)言中,雖然創(chuàng)建并啟動(dòng)一個(gè)線程非常方便,但是由于創(chuàng)建線程需要占用一定的操作系統(tǒng)資源,在高并發(fā)的情況下,頻繁的創(chuàng)建和銷毀線程會(huì)大量消耗CPU和內(nèi)存資源,對(duì)程序性能造成很大的影響。為了避免這一問(wèn)題,Java給我們提供了線程池。

線程池是一種基于池化技術(shù)思想來(lái)管理線程的工具。在線程池中維護(hù)了多個(gè)線程,由線程池統(tǒng)一的管理調(diào)配線程來(lái)執(zhí)行任務(wù)。通過(guò)線程復(fù)用,減少了頻繁創(chuàng)建和銷毀線程的開(kāi)銷。

本章內(nèi)容我們先來(lái)了解一下線程池的一些基礎(chǔ)知識(shí),學(xué)習(xí)如何使用線程池以及了解線程池的生命周期。

線程池的使用

線程池的使用和創(chuàng)建可以說(shuō)非常的簡(jiǎn)單,這得益于JDK提供給我們良好封裝的API。線程池的實(shí)現(xiàn)被封裝到了ThreadPoolExecutor中,我們可以通過(guò)ThreadPoolExecutor的構(gòu)造方法來(lái)實(shí)例化出一個(gè)線程池,代碼如下: 

  1. // 實(shí)例化一個(gè)線程池  
  2. ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 60,  
  3.         TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));  
  4. // 使用線程池執(zhí)行一個(gè)任務(wù)          
  5. executor.execute(() -> {  
  6.     // Do something  
  7. });  
  8. // 關(guān)閉線程池,會(huì)阻止新任務(wù)提交,但不影響已提交的任務(wù)  
  9. executor.shutdown();  
  10. // 關(guān)閉線程池,阻止新任務(wù)提交,并且中斷當(dāng)前正在運(yùn)行的線程  
  11. executor.showdownNow(); 

創(chuàng)建好線程池后直接調(diào)用execute方法并傳入一個(gè)Runnable參數(shù)即可將任務(wù)交給線程池執(zhí)行,通過(guò)shutdown/shutdownNow方法可以關(guān)閉線程池。

ThreadPoolExecutor的構(gòu)造方法中參數(shù)眾多,對(duì)于初學(xué)者而言在沒(méi)有了解各個(gè)參數(shù)的作用的情況下很難去配置合適的線程池。因此Java還為我們提供了一個(gè)線程池工具類Executors來(lái)快捷的創(chuàng)建線程池。Executors提供了很多簡(jiǎn)便的創(chuàng)建線程池的方法,舉兩個(gè)例子,代碼如下: 

  1. // 實(shí)例化一個(gè)單線程的線程池  
  2. ExecutorService singleExecutor = Executors.newSingleThreadExecutor();  
  3. // 創(chuàng)建固定線程個(gè)數(shù)的線程池 
  4. ExecutorService fixedExecutor = Executors.newFixedThreadPool(10);  
  5. // 創(chuàng)建一個(gè)可重用固定線程數(shù)的線程池  
  6. ExecutorService executorService2 = Executors.newCachedThreadPool(); 

但是,通常來(lái)說(shuō)在實(shí)際開(kāi)發(fā)中并不推薦直接使用Executors來(lái)創(chuàng)建線程池,而是需要根據(jù)項(xiàng)目實(shí)際情況配置適合自己項(xiàng)目的線程池,關(guān)于如何配置合適的線程池這是后話,需要我們理解線程池的各個(gè)參數(shù)以及線程池的工作原理之后才能有答案。

線程池的生命周期

線程池從誕生到死亡,中間會(huì)經(jīng)歷RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五個(gè)生命周期狀態(tài)。

  •  RUNNING 表示線程池處于運(yùn)行狀態(tài),能夠接受新提交的任務(wù)且能對(duì)已添加的任務(wù)進(jìn)行處理。RUNNING狀態(tài)是線程池的初始化狀態(tài),線程池一旦被創(chuàng)建就處于RUNNING狀態(tài)。
  •  SHUTDOWN 線程處于關(guān)閉狀態(tài),不接受新任務(wù),但可以處理已添加的任務(wù)。RUNNING狀態(tài)的線程池調(diào)用shutdown后會(huì)進(jìn)入SHUTDOWN狀態(tài)。
  •  STOP 線程池處于停止?fàn)顟B(tài),不接收任務(wù),不處理已添加的任務(wù),且會(huì)中斷正在執(zhí)行任務(wù)的線程。RUNNING狀態(tài)的線程池調(diào)用了shutdownNow后會(huì)進(jìn)入STOP狀態(tài)。
  •  TIDYING 當(dāng)所有任務(wù)已終止,且任務(wù)數(shù)量為0時(shí),線程池會(huì)進(jìn)入TIDYING。當(dāng)線程池處于SHUTDOWN狀態(tài)時(shí),阻塞隊(duì)列中的任務(wù)被執(zhí)行完了,且線程池中沒(méi)有正在執(zhí)行的任務(wù)了,狀態(tài)會(huì)由SHUTDOWN變?yōu)門IDYING。當(dāng)線程處于STOP狀態(tài)時(shí),線程池中沒(méi)有正在執(zhí)行的任務(wù)時(shí)則會(huì)由STOP變?yōu)門IDYING。
  •  TERMINATED 線程終止?fàn)顟B(tài)。處于TIDYING狀態(tài)的線程執(zhí)行terminated()后進(jìn)入TERMINATED狀態(tài)。

根據(jù)上述線程池生命周期狀態(tài)的描述,可以畫(huà)出如下所示的線程池生命周期狀態(tài)流程示意圖。

線程池的工作機(jī)制

ThreadPoolExecutor中的參數(shù)

上一小節(jié)中,我們使用ThreadPoolExecutor的構(gòu)造方法來(lái)創(chuàng)建了一個(gè)線程池。其實(shí)在ThreadPoolExecutor中有多個(gè)構(gòu)造方法,但是最終都調(diào)用到了下邊代碼中的這一個(gè)構(gòu)造方法: 

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     public ThreadPoolExecutor(int corePoolSize,  
  3.                               int maximumPoolSize,  
  4.                               long keepAliveTime, 
  5.                                TimeUnit unit,  
  6.                               BlockingQueue<Runnable> workQueue,  
  7.                               ThreadFactory threadFactory,  
  8.                               RejectedExecutionHandler handler) {  
  9.         // ...省略校驗(yàn)相關(guān)代碼  
  10.         this.corePoolSize = corePoolSize;  
  11.         this.maximumPoolSize = maximumPoolSize;  
  12.         this.workQueue = workQueue;  
  13.         this.keepAliveTime = unit.toNanos(keepAliveTime);  
  14.         this.threadFactory = threadFactory;  
  15.         this.handler = handler;  
  16.     }  
  17.     // ...     

這個(gè)構(gòu)造方法中有7個(gè)參數(shù)之多,我們逐個(gè)來(lái)看每個(gè)參數(shù)所代表的含義:

  •  corePoolSize 表示線程池的核心線程數(shù)。當(dāng)有任務(wù)提交到線程池時(shí),如果線程池中的線程數(shù)小于corePoolSize,那么則直接創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。
  •  workQueue 任務(wù)隊(duì)列,它是一個(gè)阻塞隊(duì)列,用于存儲(chǔ)來(lái)不及執(zhí)行的任務(wù)的隊(duì)列。當(dāng)有任務(wù)提交到線程池的時(shí)候,如果線程池中的線程數(shù)大于等于corePoolSize,那么這個(gè)任務(wù)則會(huì)先被放到這個(gè)隊(duì)列中,等待執(zhí)行。
  •  maximumPoolSize 表示線程池支持的最大線程數(shù)量。當(dāng)一個(gè)任務(wù)提交到線程池時(shí),線程池中的線程數(shù)大于corePoolSize,并且workQueue已滿,那么則會(huì)創(chuàng)建新的線程執(zhí)行任務(wù),但是線程數(shù)要小于等于maximumPoolSize。
  •  keepAliveTime 非核心線程空閑時(shí)保持存活的時(shí)間。非核心線程即workQueue滿了之后,再提交任務(wù)時(shí)創(chuàng)建的線程,因?yàn)檫@些線程不是核心線程,所以它空閑時(shí)間超過(guò)keepAliveTime后則會(huì)被回收。
  •  unit 非核心線程空閑時(shí)保持存活的時(shí)間的單位
  •  threadFactory 創(chuàng)建線程的工廠,可以在這里統(tǒng)一處理創(chuàng)建線程的屬性
  •  handler 拒絕策略,當(dāng)線程池中的線程達(dá)到maximumPoolSize線程數(shù)后且workQueue已滿的情況下,再向線程池提交任務(wù)則執(zhí)行對(duì)應(yīng)的拒絕策略

線程池工作流程

線程池提交任務(wù)是從execute方法開(kāi)始的,我們可以從execute方法來(lái)分析線程池的工作流程。

(1)當(dāng)execute方法提交一個(gè)任務(wù)時(shí),如果線程池中線程數(shù)小于corePoolSize,那么不管線程池中是否有空閑的線程,都會(huì)創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行任務(wù)。

(2)當(dāng)execute方法提交一個(gè)任務(wù)時(shí),線程池中的線程數(shù)已經(jīng)達(dá)到了corePoolSize,且此時(shí)沒(méi)有空閑的線程,那么則會(huì)將任務(wù)存儲(chǔ)到workQueue中。

(3)如果execute提交任務(wù)時(shí)線程池中的線程數(shù)已經(jīng)到達(dá)了corePoolSize,并且workQueue已滿,那么則會(huì)創(chuàng)建新的線程來(lái)執(zhí)行任務(wù),但總線程數(shù)應(yīng)該小于maximumPoolSize。

(4)如果線程池中的線程執(zhí)行完了當(dāng)前的任務(wù),則會(huì)嘗試從workQueue中取出第一個(gè)任務(wù)來(lái)執(zhí)行。如果workQueue為空則會(huì)阻塞線程。

(5)如果execute提交任務(wù)時(shí),線程池中的線程數(shù)達(dá)到了maximumPoolSize,且workQueue已滿,此時(shí)會(huì)執(zhí)行拒絕策略來(lái)拒絕接受任務(wù)。

(6)如果線程池中的線程數(shù)超過(guò)了corePoolSize,那么空閑時(shí)間超過(guò)keepAliveTime的線程會(huì)被銷毀,但程池中線程個(gè)數(shù)會(huì)保持為corePoolSize。

(7)如果線程池存在空閑的線程,并且設(shè)置了allowCoreThreadTimeOut為true。那么空閑時(shí)間超過(guò)keepAliveTime的線程都會(huì)被銷毀。

線程池的拒絕策略

如果線程池中的線程數(shù)達(dá)到了maximumPoolSize,并且workQueue隊(duì)列存儲(chǔ)滿的情況下,線程池會(huì)執(zhí)行對(duì)應(yīng)的拒絕策略。在JDK中提供了RejectedExecutionHandler接口來(lái)執(zhí)行拒絕操作。實(shí)現(xiàn)RejectedExecutionHandler的類有四個(gè),對(duì)應(yīng)了四種拒絕策略。分別如下:DiscardPolicy 當(dāng)提交任務(wù)到線程池中被拒絕時(shí),線程池會(huì)丟棄這個(gè)被拒絕的任務(wù)

  •  DiscardOldestPolicy 當(dāng)提交任務(wù)到線程池中被拒絕時(shí),線程池會(huì)丟棄等待隊(duì)列中最老的任務(wù)。
  •  CallerRunsPolicy 當(dāng)提交任務(wù)到線程池中被拒絕時(shí),會(huì)在線程池當(dāng)前正在運(yùn)行的Thread線程中處理被拒絕額任務(wù)。即哪個(gè)線程提交的任務(wù)哪個(gè)線程去執(zhí)行。
  •  AbortPolicy 當(dāng)提交任務(wù)到線程池中被拒絕時(shí),直接拋出RejectedExecutionException異常。

線程池源碼分析

從上一章對(duì)線程池的工作流程解讀來(lái)看,線程池的原理似乎并沒(méi)有很難。但是開(kāi)篇時(shí)我說(shuō)過(guò)想要讀懂線程池的源碼并不容,主要原因是線程池內(nèi)部運(yùn)用到了大量并發(fā)相關(guān)知識(shí),另外還與線程池中用到的位運(yùn)算有關(guān)。

線程池中的位運(yùn)算(了解內(nèi)容)

在向線程池提交任務(wù)時(shí)有兩個(gè)比較中要的參數(shù)會(huì)決定任務(wù)的去向,這兩個(gè)參數(shù)分別是線程池的狀態(tài)和線程池中的線程數(shù)。在ThreadPoolExecutor內(nèi)部使用了一個(gè)AtomicInteger類型的整數(shù)ctl來(lái)表示這兩個(gè)參數(shù),代碼如下: 

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     // Integer.SIZE = 32.所以 COUNT_BITS29  
  3.     private static final int COUNT_BITS = Integer.SIZE - 3;  
  4.     // 00001111 11111111 11111111 11111111 這個(gè)值可以表示線程池的最大線程容量  
  5.     private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;  
  6.     // 將-1左移29位得到RUNNING狀態(tài)的值  
  7.     private static final int RUNNING    = -1 << COUNT_BITS;     
  8.     // 線程池運(yùn)行狀態(tài)和線程數(shù)  
  9.     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  
  10.     private static int ctlOf(int rs, int wc) { return rs | wc; }  
  11.     // ...  
  12. }     

因?yàn)樯婕岸嗑€程的操作,這里為了保證原子性,ctl參數(shù)使用了AtomicInteger類型,并且通過(guò)ctlOf方法來(lái)計(jì)算出了ctl的初始值。如果你不了解位運(yùn)算大概很難理解上述代碼的用意。

我們知道,int類型在Java中占用4byte的內(nèi)存,一個(gè)byte占用8bit,所以Java中的int類型共占用32bit。對(duì)于這個(gè)32bit,我們可以進(jìn)行高低位的拆分。做Android開(kāi)發(fā)的同學(xué)應(yīng)該都了解View測(cè)量流程中的MeasureSpec參數(shù),這個(gè)參數(shù)將32bit的int拆分成了高2位和低30位,分別表示View的測(cè)量模式和測(cè)量值。而這里的ctl與MeasureSpec類似,ctl將32位的int拆分成了高3位和低29位,分別表示線程池的運(yùn)行狀態(tài)和線程池中的線程個(gè)數(shù)。

下面我們通過(guò)位運(yùn)算來(lái)驗(yàn)證一下ctl是如何工作的,當(dāng)然,如果你不理解這個(gè)位運(yùn)算的過(guò)程對(duì)理解線程池的源碼影響并不大,所以對(duì)以下驗(yàn)證內(nèi)容不感興趣的同學(xué)可以直接略過(guò)。

可以看到上述代碼中RUNNING的值為-1左移29位,我們知道在計(jì)算機(jī)中**負(fù)數(shù)是以其絕對(duì)值的補(bǔ)碼來(lái)表示的,而補(bǔ)碼是由反碼加1得到。**因此-1在計(jì)算機(jī)中存儲(chǔ)形式為1的反碼+1。 

  1. 1的原碼:00000000 00000000 00000000 00000001  
  2.                                             +  
  3. 1的反碼:11111111 11111111 11111111 11111110  
  4.        ---------------------------------------  
  5. -1存儲(chǔ): 11111111 11111111 11111111 11111111 

接下來(lái)對(duì)-1左移29位可以得到RUNNING的值為: 

  1. // 高三位表示線程狀態(tài),即高三位為111表示RUNNING  
  2. 11100000 00000000 00000000 00000000 

而AtomicInteger初始線程數(shù)量是0,因此ctlOf方法中的“|”運(yùn)算如下: 

  1. RUNNING:  11100000 00000000 00000000 00000000  
  2.                                                |  
  3. 線程數(shù)為0:  00000000 00000000 00000000 00000000  
  4.           ---------------------------------------  
  5. 得到ctl:   11100000 00000000 00000000 00000000 

通過(guò)RUNNING|0(線程數(shù))即可得到ctl的初始值。同時(shí)還可以通過(guò)以下方法將ctl拆解成運(yùn)行狀態(tài)和線程數(shù): 

  1. // 00001111 11111111 11111111 11111111  
  2. private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;  
  3. // 獲取線程池運(yùn)行狀態(tài) 
  4. private static int runStateOf(int c)     { return c & ~COUNT_MASK; }  
  5. // 獲取線程池中的線程數(shù)  
  6. private static int workerCountOf(int c)  { return c & COUNT_MASK; } 

假設(shè)此時(shí)線程池為RUNNING狀態(tài),且線程數(shù)為0,驗(yàn)證一下runStateOf是如何得到線程池的運(yùn)行狀態(tài)的: 

  1. COUNT_MASK:  00001111 11111111 11111111 11111111  
  2. ~COUNT_MASK: 11110000 00000000 00000000 00000000  
  3.                                                   &  
  4. ctl:         11100000 00000000 00000000 00000000  
  5.             ---------------------------------------- 
  6.  RUNNING:     11100000 00000000 00000000 00000000      

如果不理解上邊的驗(yàn)證流程沒(méi)有關(guān)系,只要知道通過(guò)runStateOf方法可以得到線程池的運(yùn)行狀態(tài),通過(guò)workerCountOf可以得到線程池中的線程數(shù)即可。

接下來(lái)我們進(jìn)入線程池的源碼的源碼分析環(huán)節(jié)。

ThreadPoolExecutor的execute

向線程池提交任務(wù)的方法是execute方法,execute方法是ThreadPoolExecutor的核心方法,以此方法為入口來(lái)進(jìn)行剖析,execute方法的代碼如下: 

  1. public void execute(Runnable command) {  
  2.      if (command == null)  
  3.          throw new NullPointerException();  
  4.      // 獲取ctl的值  
  5.      int c = ctl.get();  
  6.      // 1.線程數(shù)小于corePoolSize  
  7.      if (workerCountOf(c) < corePoolSize) {  
  8.          // 線程池中線程數(shù)小于核心線程數(shù),則嘗試創(chuàng)建核心線程執(zhí)行任務(wù)  
  9.          if (addWorker(command, true))  
  10.              return;  
  11.          c = ctl.get(); 
  12.      }  
  13.      // 2.到此處說(shuō)明線程池中線程數(shù)大于核心線程數(shù)或者創(chuàng)建線程失敗  
  14.      if (isRunning(c) && workQueue.offer(command)) {  
  15.          // 如果線程是運(yùn)行狀態(tài)并且可以使用offer將任務(wù)加入阻塞隊(duì)列未滿,offer是非阻塞操作。  
  16.          int recheck = ctl.get();  
  17.          // 重新檢查線程池狀態(tài),因?yàn)樯洗螜z測(cè)后線程池狀態(tài)可能發(fā)生改變,如果非運(yùn)行狀態(tài)就移除任務(wù)并執(zhí)行拒絕策略  
  18.          if (! isRunning(recheck) && remove(command))  
  19.              reject(command);  
  20.          // 如果是運(yùn)行狀態(tài),并且線程數(shù)是0,則創(chuàng)建線程  
  21.          else if (workerCountOf(recheck) == 0)  
  22.              // 線程數(shù)是0,則創(chuàng)建非核心線程,且不指定首次執(zhí)行任務(wù),這里的第二個(gè)參數(shù)其實(shí)沒(méi)有實(shí)際意義  
  23.              addWorker(null, false);  
  24.      }  
  25.      // 3.阻塞隊(duì)列已滿,創(chuàng)建非核心線程執(zhí)行任務(wù)  
  26.      else if (!addWorker(command, false))  
  27.          // 如果失敗,則執(zhí)行拒絕策略  
  28.          reject(command);  
  29.  } 

execute方法中的邏輯可以分為三部分:

  •  1.如果線程池中的線程數(shù)小于核心線程,則直接調(diào)用addWorker方法創(chuàng)建新線程來(lái)執(zhí)行任務(wù)。
  •  2.如果線程池中的線程數(shù)大于核心線程數(shù),則將任務(wù)添加到阻塞隊(duì)列中,接著再次檢驗(yàn)線程池的運(yùn)行狀態(tài),因?yàn)樯洗螜z測(cè)過(guò)之后線程池狀態(tài)有可能發(fā)生了變化,如果線程池關(guān)閉了,那么移除任務(wù),執(zhí)行拒絕策略。如果線程依然是運(yùn)行狀態(tài),但是線程池中沒(méi)有線程,那么就調(diào)用addWorker方法創(chuàng)建線程,注意此時(shí)傳入任務(wù)參數(shù)是null,即不指定執(zhí)行任務(wù),因?yàn)槿蝿?wù)已經(jīng)加入了阻塞隊(duì)列。創(chuàng)建完線程后從阻塞隊(duì)列中取出任務(wù)執(zhí)行。
  •  3.如果第2步將任務(wù)添加到阻塞隊(duì)列失敗了,說(shuō)明阻塞隊(duì)列任務(wù)已滿,那么則會(huì)執(zhí)行第三步,即創(chuàng)建非核心線程來(lái)執(zhí)行任務(wù),如果非核心線程創(chuàng)建失敗那么就執(zhí)行拒絕策略。

可以看到,代碼的執(zhí)行邏輯和我們?cè)诘诙轮蟹治龅木€程池的工作流程是一樣的。

接下來(lái)看下execute方法中創(chuàng)建線程的方法addWoker,addWoker方法承擔(dān)了核心線程和非核心線程的創(chuàng)建,通過(guò)一個(gè)boolean參數(shù)core來(lái)區(qū)分是創(chuàng)建核心線程還是非核心線程。先來(lái)看addWorker方法前半部分的代碼: 

  1. // 返回值表示是否成功創(chuàng)建了線程  
  2.   private boolean addWorker(Runnable firstTask, boolean core) {  
  3.        // 這里做了一個(gè)retry標(biāo)記,相當(dāng)于goto.  
  4.        retry:  
  5.        for (int c = ctl.get();;) {  
  6.            // Check if queue empty only if necessary.  
  7.            if (runStateAtLeast(c, SHUTDOWN)  
  8.                && (runStateAtLeast(c, STOP)  
  9.                    || firstTask != null  
  10.                    || workQueue.isEmpty()))  
  11.                return false;  
  12.            for (;;) {  
  13.                // 根據(jù)core來(lái)確定創(chuàng)建最大線程數(shù),超過(guò)最大值則創(chuàng)建線程失敗,注意這里的最大值可能有s三個(gè)corePoolSize、maximumPoolSize和線程池線程的最大容量  
  14.                if (workerCountOf(c)  
  15.                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))  
  16.                    return false;  
  17.                // 通過(guò)CAS來(lái)將線程數(shù)+1,如果成功則跳出循環(huán),執(zhí)行下邊邏輯     
  18.                if (compareAndIncrementWorkerCount(c))  
  19.                    break retry;  
  20.                c = ctl.get();  // Re-read ctl  
  21.                // 線程池的狀態(tài)發(fā)生了改變,退回retry重新執(zhí)行  
  22.                if (runStateAtLeast(c, SHUTDOWN))  
  23.                    continue retry;  
  24.            }  
  25.        }  
  26.        // ...省略后半部分  
  27.        return workerStarted;  
  28.    } 

這部分代碼會(huì)通過(guò)是否創(chuàng)建核心線程來(lái)確定線程池中線程數(shù)的值,如果是創(chuàng)建核心線程,那么最大值不能超過(guò)corePoolSize,如果是創(chuàng)建非核心線程那么線程數(shù)不能超過(guò)maximumPoolSize,另外無(wú)論是創(chuàng)建核心線程還是非核心線程,最大線程數(shù)都不能超過(guò)線程池允許的最大線程數(shù)COUNT_MASK(有可能設(shè)置的maximumPoolSize大于COUNT_MASK)。如果線程數(shù)大于最大值就返回false,創(chuàng)建線程失敗。

接下來(lái)通過(guò)CAS將線程數(shù)加1,如果成功那么就break retry結(jié)束無(wú)限循環(huán),如果CAS失敗了則就continue retry從新開(kāi)始for循環(huán),注意這里的retry不是Java的關(guān)鍵字,是一個(gè)可以任意命名的字符。

接下來(lái),如果能繼續(xù)向下執(zhí)行則開(kāi)始執(zhí)行創(chuàng)建線程并執(zhí)行任務(wù)的工作了,看下addWorker方法的后半部分代碼: 

  1. private boolean addWorker(Runnable firstTask, boolean core) {  
  2.        // ...省略前半部分  
  3.        boolean workerStarted = false 
  4.        boolean workerAdded = false 
  5.        Worker w = null 
  6.        try { 
  7.             // 實(shí)例化一個(gè)Worker,內(nèi)部封裝了線程  
  8.            w = new Worker(firstTask);  
  9.            // 取出新建的線程  
  10.            final Thread t = w.thread;  
  11.            if (t != null) {  
  12.                // 這里使用ReentrantLock加鎖保證線程安全  
  13.                final ReentrantLock mainLock = this.mainLock;  
  14.                mainLock.lock();  
  15.                try {  
  16.                    int c = ctl.get();  
  17.                    // 拿到鎖湖重新檢查線程池狀態(tài),只有處于RUNNING狀態(tài)或者處于SHUTDOWN并且firstTask==null時(shí)候才會(huì)創(chuàng)建線程  
  18.                    if (isRunning(c) ||  
  19.                        (runStateLessThan(c, STOP) && firstTask == null)) {  
  20.                        // 線程不是處于NEW狀態(tài),說(shuō)明線程已經(jīng)啟動(dòng),拋出異常  
  21.                        if (t.getState() != Thread.State.NEW)  
  22.                            throw new IllegalThreadStateException();  
  23.                        // 將線程加入線程隊(duì)列,這里的worker是一個(gè)HashSet    
  24.                        workers.add(w);  
  25.                        workerAdded = true 
  26.                        int s = workers.size();  
  27.                        if (s > largestPoolSize)  
  28.                            largestPoolSize = s;  
  29.                    }  
  30.                } finally {  
  31.                    mainLock.unlock();  
  32.                }  
  33.                if (workerAdded) {  
  34.                    // 開(kāi)啟線程執(zhí)行任務(wù)  
  35.                    t.start();  
  36.                    workerStarted = true 
  37.                }  
  38.            }  
  39.        } finally {  
  40.            if (! workerStarted)  
  41.                addWorkerFailed(w);  
  42.        }  
  43.        return workerStarted;  
  44.    } 

這部分邏輯其實(shí)比較容易理解,就是創(chuàng)建Worker并開(kāi)啟線程執(zhí)行任務(wù)的過(guò)程,Worker是對(duì)線程的封裝,創(chuàng)建的worker會(huì)被添加到ThreadPoolExecutor中的HashSet中。也就是線程池中的線程都維護(hù)在這個(gè)名為workers的HashSet中并被ThreadPoolExecutor所管理,HashSet中的線程可能處于正在工作的狀態(tài),也可能處于空閑狀態(tài),一旦達(dá)到指定的空閑時(shí)間,則會(huì)根據(jù)條件進(jìn)行回收線程。

我們知道,線程調(diào)用start后就會(huì)開(kāi)始執(zhí)行線程的邏輯代碼,執(zhí)行完后線程的生命周期就結(jié)束了,那么線程池是如何保證Worker執(zhí)行完任務(wù)后仍然不結(jié)束的呢?當(dāng)線程空閑超時(shí)或者關(guān)閉線程池又是怎樣進(jìn)行線程回收的呢?這個(gè)實(shí)現(xiàn)邏輯其實(shí)就在Worker中。看下Worker的代碼: 

  1. private final class Worker  
  2.        extends AbstractQueuedSynchronizer  
  3.        implements Runnable  
  4.    {  
  5.        // 執(zhí)行任務(wù)的線程  
  6.        final Thread thread;  
  7.        // 初始化Worker時(shí)傳進(jìn)來(lái)的任務(wù),可能為null,如果不空,則創(chuàng)建和立即執(zhí)行這個(gè)task,對(duì)應(yīng)核心線程創(chuàng)建的情況  
  8.        Runnable firstTask;  
  9.        Worker(Runnable firstTask) {  
  10.            // 初始化時(shí)設(shè)置setate為-1  
  11.            setState(-1); // inhibit interrupts until runWorker  
  12.            this.firstTask = firstTask;  
  13.            // 通過(guò)線程工程創(chuàng)建線程  
  14.            this.thread = getThreadFactory().newThread(this);  
  15.        }  
  16.        // 線程的真正執(zhí)行邏輯  
  17.        public void run() {  
  18.            runWorker(this);  
  19.        }  
  20.        // 判斷線程是否是獨(dú)占狀態(tài),如果不是意味著線程處于空閑狀態(tài)  
  21.        protected boolean isHeldExclusively() {  
  22.            return getState() != 0; 
  23.        }  
  24.        // 獲取鎖  
  25.        protected boolean tryAcquire(int unused) {  
  26.            if (compareAndSetState(0, 1)) {  
  27.                setExclusiveOwnerThread(Thread.currentThread());  
  28.                return true; 
  29.             }  
  30.            return false;  
  31.        }  
  32.        // 釋放鎖 
  33.         protected boolean tryRelease(int unused) {  
  34.            setExclusiveOwnerThread(null);  
  35.            setState(0);  
  36.            return true;  
  37.        }  
  38.        // ...  
  39.    } 

Worker是位于ThreadPoolExecutor中的一個(gè)內(nèi)部類,它繼承了AQS,使用AQS來(lái)實(shí)現(xiàn)了獨(dú)占鎖的功能,但是并沒(méi)支持可重入。這里使用不可重入的特性來(lái)表示線程的執(zhí)行狀態(tài),即可以通過(guò)isHeldExclusively方法來(lái)判斷,如果是獨(dú)占狀態(tài),說(shuō)明線程正在執(zhí)行任務(wù),如果非獨(dú)占狀態(tài),說(shuō)明線程處于空閑狀態(tài)。關(guān)于AQS我們前邊文章中已經(jīng)詳細(xì)分析過(guò)了,不了解AQS的可以翻看前邊ReentrantLock的文章。

另外,Worker還實(shí)現(xiàn)了Runnable接口,因此它的執(zhí)行邏輯就是在run方法中,run方法調(diào)用的是線程池中的runWorker(this)方法。任務(wù)的執(zhí)行邏輯就在runWorker方法中,它的代碼如下: 

  1. final void runWorker(Worker w) {  
  2.      Thread wt = Thread.currentThread();  
  3.      // 取出Worker中的任務(wù),可能為空  
  4.      Runnable task = w.firstTask;  
  5.      w.firstTask = null 
  6.      w.unlock(); // allow interrupts  
  7.      boolean completedAbruptly = true 
  8.      try {  
  9.          // task不為null或者阻塞隊(duì)列中有任務(wù),通過(guò)循環(huán)不斷的從阻塞隊(duì)列中取出任務(wù)執(zhí)行 
  10.          while (task != null || (task = getTask()) != null) {  
  11.              w.lock();  
  12.              // ...  
  13.              try {  
  14.                  // 任務(wù)執(zhí)行前的hook點(diǎn)  
  15.                  beforeExecute(wt, task);  
  16.                  try {  
  17.                      // 執(zhí)行任務(wù)  
  18.                      task.run();  
  19.                      // 任務(wù)執(zhí)行后的hook點(diǎn)  
  20.                      afterExecute(task, null);  
  21.                  } catch (Throwable ex) {  
  22.                      afterExecute(task, ex);  
  23.                      throw ex;  
  24.                  }  
  25.              } finally {  
  26.                  task = null 
  27.                  w.completedTasks++;  
  28.                  w.unlock();  
  29.              }  
  30.          }  
  31.          completedAbruptly = false 
  32.      } finally {  
  33.          // 超時(shí)沒(méi)有取到任務(wù),則回收空閑超時(shí)的線程  
  34.          processWorkerExit(w, completedAbruptly);  
  35.      }  
  36.  } 

可以看到,runWorker的核心邏輯就是不斷通過(guò)getTask方法從阻塞隊(duì)列中獲取任務(wù)并執(zhí)行.通過(guò)這樣的方式實(shí)現(xiàn)了線程的復(fù)用,避免了創(chuàng)建線程。這里要注意的是這里是一個(gè)“生產(chǎn)者-消費(fèi)者”模式,getTask是從阻塞隊(duì)列中取任務(wù),所以如果阻塞隊(duì)列中沒(méi)有任務(wù)的時(shí)候就會(huì)處于阻塞狀態(tài)。

getTask中通過(guò)判斷是否要回收線程而設(shè)置了等待超時(shí)時(shí)間,如果阻塞隊(duì)列中一直沒(méi)有任務(wù),那么在等待keepAliveTime時(shí)間后會(huì)拋出異常。最終會(huì)走到上述代碼的finally方法中,意味著有線程空閑時(shí)間超過(guò)了keepAliveTime時(shí)間,那么調(diào)用processWorkerExit方法移除Worker。processWorkerExit方法中沒(méi)有復(fù)雜難以理解的邏輯,這里就不再貼代碼了。我們重點(diǎn)看下getTask中是如何處理的,代碼如下: 

  1.  private Runnable getTask() {  
  2.     boolean timedOut = false; // Did the last poll() time out?  
  3.     for (;;) {  
  4.         int c = ctl.get();  
  5.         // ...   
  6.         // Flag1. 如果配置了allowCoreThreadTimeOut==true或者線程池中的線程數(shù)大于核心線程數(shù),則timed為true,表示開(kāi)啟指定線程超時(shí)后被回收  
  7.         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 
  8.         // ...  
  9.         try {  
  10.             // Flag2. 取出阻塞隊(duì)列中的任務(wù),注意如果timed為true,則會(huì)調(diào)用阻塞隊(duì)列的poll方法,并設(shè)置超時(shí)時(shí)間為keepAliveTime,如果超時(shí)沒(méi)有取到任務(wù)則會(huì)拋出異常。  
  11.             Runnable r = timed ?  
  12.                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
  13.                 workQueue.take();  
  14.             if (r != null)  
  15.                 return r;  
  16.             timedOut = true 
  17.         } catch (InterruptedException retry) {  
  18.             timedOut = false 
  19.         }  
  20.     }  

重點(diǎn)看getTask是如何處理空閑超時(shí)的邏輯的。我們知道,回收線程的條件是線程大于核心線程數(shù)或者配置了allowCoreThreadTimeOut為true,當(dāng)線程空閑超時(shí)的情況下就會(huì)回收線程。上述代碼在Flag1處先判斷了如果線程池中的線程數(shù)大于核心線程數(shù),或者開(kāi)啟了allowCoreThreadTimeOut,那么就需要開(kāi)啟線程空閑超時(shí)回收。

所有在Flag2處,timed為true的情況下調(diào)用了阻塞隊(duì)列的poll方法,并傳入了超時(shí)時(shí)間為keepAliveTime,如果在keepAliveTime時(shí)間內(nèi),阻塞隊(duì)列一直為null那么久會(huì)拋出異常,結(jié)束runWorker的循環(huán)。進(jìn)而執(zhí)行runWorker方法中回收線程的操作。

這里需要我們理解阻塞隊(duì)列poll方法的使用,poll方法接受一個(gè)時(shí)間參數(shù),是一個(gè)阻塞操作,在給定的時(shí)間內(nèi)沒(méi)有獲取到數(shù)據(jù)就會(huì)拋出異常。其實(shí)說(shuō)白了,阻塞隊(duì)列就是一個(gè)使用ReentrantLock實(shí)現(xiàn)的“生產(chǎn)者-消費(fèi)者”模式,我們?cè)谏钊肜斫釰ava線程的等待與喚醒機(jī)制(二)這篇文章中使用ReentrantLock實(shí)現(xiàn)“生產(chǎn)者-消費(fèi)者”模型其實(shí)就是一個(gè)簡(jiǎn)單的阻塞隊(duì)列,與JDK中的BlockingQueue實(shí)現(xiàn)機(jī)制類似。感興趣的同學(xué)可以自己查看ArrayBlockingQueue等阻塞隊(duì)列的實(shí)現(xiàn),限于文章篇幅,這里就不再贅述了。

ThreadPoolExecutor的拒絕策略

上一小節(jié)中我們多次提到線程池的拒絕策略,它是在reject方法中實(shí)現(xiàn)的。實(shí)現(xiàn)代碼也非常簡(jiǎn)單,代碼如下: 

  1. final void reject(Runnable command) {  
  2.     handler.rejectedExecution(command, this);  

通過(guò)調(diào)用handler的rejectedExecution方法實(shí)現(xiàn)。這里其實(shí)就是運(yùn)用了策略模式,handler是一個(gè)RejectedExecutionHandler類型的成員變量,RejectedExecutionHandler是一個(gè)接口,只有一個(gè)rejectedExecution方法。在實(shí)例化線程池時(shí)構(gòu)造方法中傳入對(duì)應(yīng)的拒絕策略實(shí)例即可。前文已經(jīng)提到了Java提供的幾種默認(rèn)實(shí)現(xiàn)分別為DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy以及AbortPolicy。

以AbortPolicy直接拋出異常為例,來(lái)看下代碼實(shí)現(xiàn): 

  1. public static class AbortPolicy implements RejectedExecutionHandler {  
  2.    public AbortPolicy() { }  
  3.    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
  4.        throw new RejectedExecutionException("Task " + r.toString() +  
  5.                                             " rejected from " +  
  6.                                             e.toString());  
  7.    } 

可以看到直接在rejectedExecution方法中拋出了RejectedExecutionException來(lái)拒絕任務(wù)。其他的幾個(gè)策略實(shí)現(xiàn)也都比較簡(jiǎn)單,有興趣可以自己查閱代碼。

ThreadPoolExecutor的shutdown

調(diào)用shutdown方法后,會(huì)將線程池標(biāo)記為SHUTDOWN狀態(tài),上邊execute的源碼可以看出,只有線程池是RUNNING狀態(tài)才接受任務(wù),因此被標(biāo)記位SHUTDOWN后,再提交任務(wù)會(huì)被線程池拒絕。shutdown的代碼如下: 

  1. public void shutdown() {  
  2.     final ReentrantLock mainLock = this.mainLock;  
  3.     mainLock.lock();  
  4.     try {  
  5.         //檢查是否可以關(guān)閉線程  
  6.         checkShutdownAccess();  
  7.         // 將線程池狀態(tài)置為SHUTDOWN狀態(tài)  
  8.         advanceRunState(SHUTDOWN);  
  9.         // 嘗試中斷空閑線程  
  10.         interruptIdleWorkers();  
  11.         // 空方法,線程池關(guān)閉的hook點(diǎn) 
  12.          onShutdown();   
  13.     } finally {  
  14.         mainLock.unlock();  
  15.     }  
  16.     tryTerminate();  
  17.  
  18. private void interruptIdleWorkers() {  
  19.     interruptIdleWorkers(false);  
  20. }     

修改線程池為SHUTDOWN狀態(tài)后,會(huì)調(diào)用interruptIdleWorkers去中斷空閑線程線程,具體實(shí)現(xiàn)邏輯是在interruptIdleWorkers(boolean onlyOne)方法中,如下:   

  1. private void interruptIdleWorkers(boolean onlyOne) {  
  2.        final ReentrantLock mainLock = this.mainLock;  
  3.        mainLock.lock();  
  4.        try {  
  5.            for (Worker w : workers) {  
  6.                Thread t = w.thread;  
  7.                // 嘗試tryLock獲取鎖,如果拿鎖成功說(shuō)明線程是空閑狀態(tài)  
  8.                if (!t.isInterrupted() && w.tryLock()) {  
  9.                    try {  
  10.                        // 中斷線程  
  11.                        t.interrupt();  
  12.                    } catch (SecurityException ignore) {  
  13.                    } finally {  
  14.                        w.unlock();  
  15.                    } 
  16.                 }  
  17.                if (onlyOne)  
  18.                    break;  
  19.            }  
  20.        } finally {  
  21.            mainLock.unlock();  
  22.        }  
  23.    } 

shutdown的邏輯比較簡(jiǎn)單,里邊做了兩件比較重要的事情,即先將線程池狀態(tài)修改為SHUTDOWN,接著遍歷所有Worker,將空閑的Worker進(jìn)行中斷。

總結(jié)

本文深入的探究了線程池的工作流程和實(shí)現(xiàn)原理。就線程池的工作流程而言其實(shí)并不難以理解。但是在分析線程池的源碼時(shí),如果沒(méi)有很好的并發(fā)基礎(chǔ)的話,大概率是難以讀懂線程池的源碼的。因?yàn)榫€程池內(nèi)部使用了大量并發(fā)知識(shí),對(duì)任何一點(diǎn)用到的并發(fā)知識(shí)認(rèn)識(shí)不到位都會(huì)造成理解偏差。寫(xiě)這篇文章參看了很多的其他線程池的相關(guān)文章,幾乎沒(méi)有找到一篇能夠剖析清楚線程池源碼的文章。歸根結(jié)底還是沒(méi)能系統(tǒng)的理解Atomic、Lock與AQS、CAS、阻塞隊(duì)列等并發(fā)相關(guān)知識(shí)。 

 

責(zé)任編輯:龐桂玉 來(lái)源: Android技術(shù)之家
相關(guān)推薦

2023-05-29 08:12:38

2020-10-14 08:50:38

搞懂 Netty 線程

2023-10-18 10:55:55

HashMap

2022-08-26 13:24:03

version源碼sources

2021-10-11 11:58:41

Channel原理recvq

2021-10-09 19:05:06

channelGo原理

2022-09-26 00:48:14

線程池阻塞數(shù)據(jù)

2024-03-11 18:18:58

項(xiàng)目Spring線程池

2024-11-27 08:15:50

2021-07-08 10:08:03

DvaJS前端Dva

2012-05-15 02:18:31

Java線程池

2022-04-24 11:06:54

SpringBootjar代碼

2024-07-15 08:20:24

2022-04-11 10:56:43

線程安全

2025-04-21 04:00:00

2021-12-29 17:29:07

KubernetesEvents集群

2023-09-28 08:15:05

SpringBean加載

2018-10-31 15:54:47

Java線程池源碼

2020-04-28 22:12:30

Nginx正向代理反向代理

2024-10-15 17:12:38

代碼父子線程開(kāi)源
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 久久久久久亚洲精品 | 欧美性精品| 免费av毛片 | 精品一区在线看 | 亚洲精品99999 | 91亚洲国产 | 亚洲国产精品久久久久秋霞不卡 | 九一精品 | 日本午夜视频 | 九九热免费看 | 玖玖在线精品 | 国产精品v | 亚洲草草视频 | 日韩2020狼一二三 | 日本偷偷操 | 无毛av | 日韩欧美在线观看视频网站 | 蜜桃一区二区三区在线 | 精品国产不卡一区二区三区 | 在线观看毛片网站 | 中文字幕视频在线观看 | 精品亚洲一区二区三区四区五区高 | 国产成人99久久亚洲综合精品 | 男人天堂久久久 | 亚洲成人日韩 | av资源中文在线 | 亚洲综合天堂 | 国产高清视频一区 | 天堂资源最新在线 | 亚洲最大av网站 | 国产精品色 | 国产一区二区三区在线看 | 韩国av一区二区 | 欧美国产一区二区 | 91精品国产高清一区二区三区 | 色噜噜亚洲男人的天堂 | 亚洲精品在线观看网站 | av在线电影网站 | 婷婷久久综合 | 亚洲毛片一区二区 | 91天堂网|