深入線程池的問題連環(huán)炮
這一篇是看了這一篇文章之后用于個(gè)人的學(xué)習(xí)記錄,加入了一些個(gè)人的理解,其中一些圖片也是來源于這篇文章https://mp.weixin.qq.com/s/NDOx94yY06OnHjrYq2lVYw
1、為什么會(huì)有線程池
JVM中的一個(gè)線程即對(duì)應(yīng)一個(gè)操作系統(tǒng)的線程,也就是JVM的線程是由操作系統(tǒng)創(chuàng)建而來,創(chuàng)建線程和銷毀線程這些都需要操作系統(tǒng)來分別賦予資源和釋放資源等
也就意味著創(chuàng)建線程變成了一個(gè)比較重的操作
我們可以利用多線程去進(jìn)行不同的工作,更高效的利用CPU資源,但是這并不意味著線程數(shù)量越多越好
我們的時(shí)代已經(jīng)由原來的單核時(shí)代變成現(xiàn)在的多核時(shí)代了,這個(gè)核指的就是CPU,在原來的單核時(shí)代,如果一個(gè)線程一直是運(yùn)算的邏輯過程,也就不涉及到線程的切換,因?yàn)檫@個(gè)線程一直在占用CPU,也就是屬于計(jì)算密集型
但是如果這個(gè)線程屬于IO密集型,也就是這個(gè)線程很多的時(shí)間都是在等待IO操作和處理IO操作,這樣就浪費(fèi)了CPU這個(gè)大腦的處理能力了
于是就有了多線程,一個(gè)線程等待IO操作,另一個(gè)線程可以頂上,充分利用了CPU的資源
隨著多核時(shí)代的到來,對(duì)于這個(gè)CPU高效利用也就變得更加迫切,CPU的核心越來越多,能同時(shí)運(yùn)行的線程數(shù)越來越多了,也就意味著此時(shí)的多線程并不只是去提高單核的處理能力,更是為了充分利用這個(gè)多核的大腦
但 CPU 的核心數(shù)有限,同時(shí)能運(yùn)行的線程數(shù)有限,所以需要根據(jù)調(diào)度算法切換執(zhí)行的線程,而線程的切換需要開銷,比如替換寄存器的內(nèi)容、高速緩存的失效等等。
如果線程數(shù)太多,切換的頻率就變高,可能使得多線程帶來的好處抵不過線程切換帶來的開銷,得不償失。
因此線程的數(shù)量需要得以控制
2、什么是線程池
線程的數(shù)量太少無法充分利用CPU,線程數(shù)太多的話會(huì)導(dǎo)致頻繁切換線程,上下文切換消耗資源,我們需要根據(jù)系統(tǒng)資源和業(yè)務(wù)性能來決定線程數(shù)量
而線程的創(chuàng)建又是屬于一個(gè)比較重的操作,所以我們想到的就是緩存一批線程,這種思想大家都明白應(yīng)該,就像是數(shù)據(jù)庫某張表需要經(jīng)常查詢,造成DB壓力過大,我們就先把經(jīng)常訪問訪問的數(shù)據(jù)放入到緩存中,用于緩解對(duì)于DB的訪問壓力
這個(gè)也是類似的道理,每次去新建和銷毀線程比較重,我們就可以通過緩存這些線程來減輕不必要的消耗
線程的數(shù)量我們需要根據(jù)硬件的資源和線程要執(zhí)行的任務(wù)這些等綜合來決定
高并發(fā)、任務(wù)執(zhí)行時(shí)間短的業(yè)務(wù),線程池線程數(shù)可以設(shè)置為CPU核數(shù)+1,減少線程上下文的切換
并發(fā)不高、任務(wù)執(zhí)行時(shí)間長的業(yè)務(wù)要分情況來討論
假如是業(yè)務(wù)時(shí)間長集中在IO操作上,也就是IO密集型的任務(wù),因?yàn)镮O操作并不占用CPU,所以不要讓所有的CPU閑下來,可以加大線程池中的線程數(shù)目,讓CPU處理更多的業(yè)務(wù)
假如是業(yè)務(wù)時(shí)間長集中在計(jì)算操作上,也就是計(jì)算密集型任務(wù),這個(gè)就沒辦法了,線程數(shù)設(shè)置為CPU核數(shù)+1,線程池中的線程數(shù)設(shè)置得少一些,減少線程上下文的切換
并發(fā)高、業(yè)務(wù)執(zhí)行時(shí)間長,解決這種類型任務(wù)的關(guān)鍵不在于線程池而在于整體架構(gòu)的設(shè)計(jì),看看這些業(yè)務(wù)里面某些數(shù)據(jù)是否能做緩存是第一步,增加服務(wù)器是第二步,至于線程池的設(shè)置,參考上面的設(shè)置即可。最后,業(yè)務(wù)執(zhí)行時(shí)間長的問題,也可能需要分析一下,看看能不能使用中間件對(duì)任務(wù)進(jìn)行拆分和解耦。
大家應(yīng)該都聽過對(duì)象池、連接池這些,池化的技術(shù)就是通過在池子里取出資源,然是使用完再放回到池子里,而線程池這一點(diǎn)稍微不太一樣,這里線程池相對(duì)來說更黑盒一些
不是我們從線程池中取線程使用,而是直接往線程池里扔任務(wù),然后線程池幫我們?nèi)?zhí)行
3、實(shí)現(xiàn)線程池
線程池內(nèi)部也是一個(gè)典型的生產(chǎn)者-消費(fèi)者模型
線程池內(nèi)部有一個(gè)存放任務(wù)列表的隊(duì)列,而內(nèi)部會(huì)不斷的有線程去隊(duì)列中取任務(wù)來執(zhí)行,用來消費(fèi)
來看一個(gè)簡易版的線程池實(shí)現(xiàn),這段代碼同樣來源于上面的博文
首先線程池內(nèi)需要定義兩個(gè)成員變量,分別是阻塞隊(duì)列和線程列表,然后自定義線程使它的任務(wù)就是不斷的從阻塞隊(duì)列中拿任務(wù)然后執(zhí)行。
- @Slf4j
- public class YesThreadPool {
- BlockingQueue<Runnable> taskQueue; //存放任務(wù)的阻塞隊(duì)列
- List<YesThread> threads; //線程列表
- YesThreadPool(BlockingQueue<Runnable> taskQueue, int threadSize) {
- this.taskQueue = taskQueue;
- threads = new ArrayList<>(threadSize);
- // 初始化線程,并定義名稱
- IntStream.rangeClosed(1, threadSize).forEach((i)-> {
- YesThread thread = new YesThread("yes-task-thread-" + i);
- thread.start();
- threads.add(thread);
- });
- }
- //提交任務(wù)只是往任務(wù)隊(duì)列里面塞任務(wù)
- public void execute(Runnable task) throws InterruptedException {
- taskQueue.put(task);
- }
- class YesThread extends Thread { //自定義一個(gè)線程
- public YesThread(String name) {
- super(name);
- }
- @Override
- public void run() {
- while (true) { //死循環(huán)
- Runnable task = null;
- try {
- task = taskQueue.take(); //不斷從任務(wù)隊(duì)列獲取任務(wù)
- } catch (InterruptedException e) {
- logger.error("記錄點(diǎn)東西.....", e);
- }
- task.run(); //執(zhí)行
- }
- }
- }
- }
當(dāng)然,這只是個(gè)最簡易版的,也有很多可以優(yōu)化的點(diǎn)
4、線程池核心參數(shù)
第1個(gè)參數(shù):設(shè)置核心線程數(shù)。默認(rèn)情況下核心線程會(huì)一直存活
第2個(gè)參數(shù):設(shè)置最大線程數(shù)。決定線程池最多可以創(chuàng)建的多少線程
第3個(gè)參數(shù)和第4個(gè)參數(shù):用來設(shè)置線程空閑時(shí)間,和空閑時(shí)間的單位,當(dāng)線程閑置超過空閑時(shí)間就會(huì)被銷毀??梢酝ㄟ^AllowCoreThreadTimeOut方法來允許核心線程被回收
第5個(gè)參數(shù):設(shè)置緩沖隊(duì)列,圖中左下方的三個(gè)隊(duì)列是設(shè)置線程池時(shí)常使用的緩沖隊(duì)列
其中Array Blocking Queue是一個(gè)有界隊(duì)列,就是指隊(duì)列有最大容量限制。Linked Blocking Queue是無界隊(duì)列,就是隊(duì)列不限制容量。最后一個(gè)是Synchronous Queue,是一個(gè)同步隊(duì)列,內(nèi)部沒有緩沖區(qū)
第6個(gè)參數(shù):設(shè)置線程池工廠方法,線程工廠用來創(chuàng)建新線程,可以用來對(duì)線程的一些屬性進(jìn)行定制,例如線程的Group、線程名、優(yōu)先級(jí)等。一般使用默認(rèn)工廠類即可
第7個(gè)參數(shù):設(shè)置線程池滿時(shí)的拒絕策略
ThreadPoolExecutor默認(rèn)有四個(gè)拒絕策略:
- ThreadPoolExecutor.AbortPolicy() 直接拋出異常RejectedExecutionException,這個(gè)是默認(rèn)的拒絕策略
- ThreadPoolExecutor.CallerRunsPolicy() 直接在提交失敗時(shí),由提交任務(wù)的線程直接執(zhí)行提交的任務(wù)
- ThreadPoolExecutor.DiscardPolicy() 直接丟棄后來的任務(wù)
- ThreadPoolExecutor.DiscardOldestPolicy() 丟棄在隊(duì)列中最早提交的任務(wù)
5、線程池原理
我們向線程提交任務(wù)時(shí)可以使用Execute和Submit,區(qū)別就是Submit可以返回一個(gè)Future對(duì)象,通過Future對(duì)象可以了解任務(wù)執(zhí)行情況,可以取消任務(wù)的執(zhí)行,還可獲取執(zhí)行結(jié)果或執(zhí)行異常。Submit最終也是通過Execute執(zhí)行的
線程池提交任務(wù)時(shí)的執(zhí)行順序如下:
向線程池提交任務(wù)時(shí),會(huì)首先判斷線程池中的線程數(shù)是否大于設(shè)置的核心線程數(shù),如果不大于,就創(chuàng)建一個(gè)核心線程來執(zhí)行任務(wù)。
如果大于核心線程數(shù),就會(huì)判斷緩沖隊(duì)列是否滿了,如果沒有滿,則放入隊(duì)列,等待線程空閑時(shí)執(zhí)行任務(wù)。
如果隊(duì)列已經(jīng)滿了,則判斷是否達(dá)到了線程池設(shè)置的最大線程數(shù),如果沒有達(dá)到,就創(chuàng)建新線程來執(zhí)行任務(wù)。
如果已經(jīng)達(dá)到了最大線程數(shù),則執(zhí)行指定的拒絕策略。這里需要注意隊(duì)列的判斷與最大線程數(shù)判斷的順序,不要搞反
線程池中的線程并不是一開始就將活躍線程直接拉滿的,而是隨著用的數(shù)量的增加,才會(huì)逐步增加線程的,這是一種懶加載思想
但是這里有一個(gè)靈魂問題,沒研究的小伙伴肯定是不知道的
6、當(dāng)線程數(shù)小于活躍線程數(shù)的時(shí)候,并且線程數(shù)都處于空閑狀態(tài),現(xiàn)在提交一個(gè)任務(wù),是新起一個(gè)線程還是用之前的線程來執(zhí)行該任務(wù)?
李老是這樣說的:
- If fewer than corePoolSize threads are running, try to start
- a new thread with the given command as its first task.
也就是無論其余線程是否空閑,只要此時(shí)線程數(shù)量小于核心線程數(shù)量,就會(huì)通過啟動(dòng)一個(gè)線程來執(zhí)行該任務(wù)
線程池是懶加載的,但是這里又顯得很勤快
也就是線程池是想要快速擁有核心線程數(shù)量的線程,這個(gè)作為線程池的中堅(jiān)力量
而最大線程數(shù)其實(shí)是為了應(yīng)付突發(fā)狀況。
舉個(gè)裝修的例子,正常情況下施工隊(duì)只要 5 個(gè)人去干活,這 5 人其實(shí)就是核心線程,但是由于工頭接的活太多了,導(dǎo)致 5 個(gè)人在約定工期內(nèi)干不完,所以工頭又去找了 2 個(gè)人來一起干,所以 5 是核心線程數(shù),7 是最大線程數(shù)。
平時(shí)就是 5 個(gè)人干活,特別忙的時(shí)候就找 7 個(gè),等閑下來就會(huì)把多余的 2 個(gè)辭了
7、看到這里你可能會(huì)覺得核心線程在線程池里面會(huì)有特殊標(biāo)記?
并沒有,不論是核心還是非核心線程,在線程池里面都是一視同仁,當(dāng)淘汰的時(shí)候不會(huì)管是哪些線程,反正留下核心線程數(shù)個(gè)線程即可
8、你是怎么理解 KeepAliveTime 的?
線程池的重點(diǎn)是保留核心數(shù)量的線程,但是會(huì)預(yù)留一些線程來用于突發(fā)情況,當(dāng)突發(fā)情況過去之后,還是只想保留核心線程,所以這個(gè)時(shí)候就通過這個(gè)時(shí)間來控制
當(dāng)線程數(shù)量大于核心線程數(shù)量的時(shí)候,并且空閑時(shí)間超過KeepAliveTime的時(shí)候,就回收線程,直到線程數(shù)量和核心數(shù)量持平為止
看了上面的線程池的邏輯,不知道大家有沒有產(chǎn)生一個(gè)疑問
為什么要把任務(wù)先放在任務(wù)隊(duì)列里面,而不是把線程先拉滿到最大線程數(shù)?
這里我先說下我的個(gè)人理解
線程池的重點(diǎn)應(yīng)該是核心線程池,而當(dāng)線程數(shù)量不夠處理的時(shí)候,先放到隊(duì)列中也是屬于一種緩沖的思想,因?yàn)槲覀冊(cè)谠O(shè)計(jì)核心線程數(shù)量的時(shí)候都是考慮的盡可能的最優(yōu)的數(shù)量,所以重點(diǎn)也就變成了盡力去維持核心線程的數(shù)量
而隊(duì)列是可以自定義數(shù)量的,我們可以通過控制隊(duì)列的長度,來控制我們可以接受的任務(wù)堆積的程度,只有當(dāng)任務(wù)堆積無法忍受的時(shí)候,才會(huì)繼續(xù)去啟動(dòng)新的線程來執(zhí)行這些任務(wù)
當(dāng)我看了Yes大佬的看法之后,發(fā)現(xiàn)也是這樣理解的,但是解釋的更深一些,我來和大家解釋下
原生版線程池的實(shí)現(xiàn)可以認(rèn)為是偏向 CPU 密集的,也就是當(dāng)任務(wù)過多的時(shí)候不是先去創(chuàng)建更多的線程,而是先緩存任務(wù),讓核心線程去消化,從上面的分析我們可以知道,當(dāng)處理 CPU 密集型任務(wù)的時(shí),線程太多反而會(huì)由于線程頻繁切換的開銷而得不償失,所以優(yōu)先堆積任務(wù)而不是創(chuàng)建新的線程。
而像 Tomcat 這種業(yè)務(wù)場景,大部分情況下是需要大量 I/O 處理的情況就做了一些定制,修改了原生線程池的實(shí)現(xiàn),使得在隊(duì)列沒滿的時(shí)候,可以創(chuàng)建線程至最大線程數(shù)。
9、如何修改原生線程池,使得可以先拉滿線程數(shù)再入任務(wù)隊(duì)列排隊(duì)?
這里的邏輯其實(shí)上面也說過了,大家看一下源碼就懂了,首先判斷的是工作線程是否小于核心線程,當(dāng)工作線程小于核心線程時(shí),直接增加線程數(shù)量來執(zhí)行任務(wù)
當(dāng)達(dá)到核心線程數(shù)量的時(shí)候,則判斷線程池是否在運(yùn)行中,在運(yùn)行中即執(zhí)行入隊(duì)操作
接下來一起看看Tomcat實(shí)現(xiàn)線程池的邏輯
- public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor
可以看到先繼承了 JUC 的線程池,然后我們重點(diǎn)關(guān)注一下 execute 這個(gè)方法
這里可以看到增加了submittedCount作為任務(wù)數(shù)的統(tǒng)計(jì),統(tǒng)計(jì)所有未完成的任務(wù)數(shù)量
首先調(diào)用原生過程,如果捕獲到拒絕的異常,則判斷隊(duì)列類型,不正確,丟棄該任務(wù),任務(wù)數(shù)量減一。
然后執(zhí)行再次入隊(duì)列,試圖增加一次挽救的機(jī)會(huì),入隊(duì)失敗,任務(wù)數(shù)量減一,最后處理捕獲異常,任務(wù)數(shù)量減一
然后我們?cè)賮砜聪麓a里出現(xiàn)的 TaskQueue,這個(gè)就是上面提到的定制關(guān)鍵點(diǎn)了。
可以看到這個(gè)任務(wù)隊(duì)列繼承了 LinkedBlockingQueue,并且有個(gè) ThreadPoolExecutor 類型的成員變量 parent ,我們?cè)賮砜聪?offer 方法的實(shí)現(xiàn),這里就是修改原來線程池任務(wù)提交與線程創(chuàng)建邏輯的核心了。
這里就是對(duì)于offer邏輯進(jìn)行了加強(qiáng),我們看一下
先是如果沒有線程實(shí)例,則直接按照原方法執(zhí)行
接著判斷如果線程數(shù)量是最大線程數(shù)量,直接入隊(duì)
未完成的任務(wù)數(shù)小于線程數(shù),證明此時(shí)還有閑著摸魚的線程,直接入隊(duì)即可,會(huì)自動(dòng)消費(fèi)
到最后,也就意味著此時(shí)核心線程都在運(yùn)行,此時(shí)判斷線程數(shù)量是否小于最大線程數(shù)量,如果小于,這里就直接返回false即可,這個(gè)false就映射了上面ThreadPoolExecutor中的execute方法中的offer,然后便會(huì)執(zhí)行相應(yīng)的增加線程的操作,而不是先選擇入隊(duì)
10、原生線程池的核心線程一定要伴隨著任務(wù)慢慢創(chuàng)建嗎
既然這么問了,答案肯定是否定的,線程池中提供了
線程池提供了兩個(gè)方法:
- prestartCoreThread:啟動(dòng)一個(gè)核心線程
- prestartAllCoreThreads :啟動(dòng)所有核心線程
不要小看這個(gè)預(yù)創(chuàng)建方法,預(yù)熱很重要,不然剛重啟的一些服務(wù)有時(shí)是頂不住瞬時(shí)請(qǐng)求的,就立馬崩了,所以有預(yù)熱線程、緩存等等操作。
- /**
- * Starts a core thread, causing it to idly wait for work. This
- * overrides the default policy of starting core threads only when
- * new tasks are executed. This method will return {@code false}
- * if all core threads have already been started.
- * @return {@code true} if a thread was started
- */
- public boolean prestartCoreThread() {
- return workerCountOf(ctl.get()) < corePoolSize &&
- addWorker(null, true);
- }
- /**
- * Starts all core threads, causing them to idly wait for work. This
- * overrides the default policy of starting core threads only when
- * new tasks are executed.
- * @return the number of threads started
- */
- public int prestartAllCoreThreads() {
- int n = 0;
- while (addWorker(null, true))
- ++n;
- return n;
- }
11、線程池的核心線程在空閑的時(shí)候一定不會(huì)被回收嗎?
有個(gè)allowCoreThreadTimeOut方法,把它設(shè)置為true ,則所有線程都會(huì)超時(shí),不會(huì)有核心數(shù)那條線的存在。
12、線程池的關(guān)閉方法shutdown和shutdownNow
關(guān)閉線程池的方法,一個(gè)是安全的關(guān)閉線程池,會(huì)等待任務(wù)都執(zhí)行完畢,一個(gè)是粗暴的直接咔嚓了所有線程,管你在不在運(yùn)行,兩個(gè)方法分別調(diào)用的就是 interruptIdleWorkers() 和 interruptWorkers() 來中斷線程
- /**
- * Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be accepted.
- * Invocation has no additional effect if already shut down.
- * <p>This method does not wait for previously submitted tasks to
- * complete execution. Use {@link #awaitTermination awaitTermination}
- * to do that.
- * @throws SecurityException {@inheritDoc}
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- advanceRunState(SHUTDOWN);
- interruptIdleWorkers();
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- }
- **
- * Attempts to stop all actively executing tasks, halts the
- * processing of waiting tasks, and returns a list of the tasks
- * that were awaiting execution. These tasks are drained (removed)
- * from the task queue upon return from this method
- * <p>This method does not wait for actively executing tasks to
- * terminate. Use {@link #awaitTermination awaitTermination} to
- * do that.
- * <p>There are no guarantees beyond best-effort attempts to stop
- * processing actively executing tasks. This implementation
- * cancels tasks via {@link Thread#interrupt}, so any task that
- * fails to respond to interrupts may never terminate
- * @throws SecurityException {@inheritDoc}
- */
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- advanceRunState(STOP);
- interruptWorkers();
- tasks = drainQueue();
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- return tasks;
- }
這又可以引申出一個(gè)問題,shutdownNow 了之后還在任務(wù)隊(duì)列中的任務(wù)咋辦?眼尖的小伙伴應(yīng)該已經(jīng)看到了,線程池還算負(fù)責(zé),把未執(zhí)行的任務(wù)拖拽到了一個(gè)列表中然后返回,至于怎么處理,就交給調(diào)用者了
13、你肯定知道線程池里的 ctl 是干嘛的咯?
其實(shí)看下注釋就很清楚了,ctl 是一個(gè)涵蓋了兩個(gè)概念的原子整數(shù)類,它將工作線程數(shù)和線程池狀態(tài)結(jié)合在一起維護(hù),低 29 位存放 workerCount,高 3 位存放 runState
其實(shí)并發(fā)包中有很多實(shí)現(xiàn)都是一個(gè)字段存多個(gè)值的,比如讀寫鎖的高 16 位存放讀鎖,低 16 位存放寫鎖,這種一個(gè)字段存放多個(gè)值可以更容易的維護(hù)多個(gè)值之間的一致性,也算是極簡主義
14、線程池有幾種狀態(tài)嗎?
注解說的很明白,我再翻譯一下:
RUNNING:能接受新任務(wù),并處理阻塞隊(duì)列中的任務(wù)
SHUTDOWN:不接受新任務(wù),但是可以處理阻塞隊(duì)列中的任務(wù)
STOP:不接受新任務(wù),并且不處理阻塞隊(duì)列中的任務(wù),并且還打斷正在運(yùn)行任務(wù)的線程,就是直接撂擔(dān)子不干了!
TIDYING:所有任務(wù)都終止,并且工作線程也為0,處于關(guān)閉之前的狀態(tài)
TERMINATED:已關(guān)閉
15、線程池的狀態(tài)是如何變遷的嗎?