讀完 RocketMQ 源碼,我學(xué)會了如何優(yōu)雅的創(chuàng)建線程
RocketMQ 是一款開源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù),提供低延時、高可靠的消息發(fā)布與訂閱服務(wù)。
這篇文章,筆者整理了 RocketMQ 源碼中創(chuàng)建線程的幾點技巧,希望大家讀完之后,能夠有所收獲。
一、創(chuàng)建單線程
首先我們先溫習(xí)下常用的創(chuàng)建單線程的兩種方式:
實現(xiàn) Runnable 接口
繼承 Thread 類
1.實現(xiàn) Runnable 接口
圖中,MyRunnable 類實現(xiàn)了 Runnable 接口的 run 方法,run 方法中定義具體的任務(wù)代碼或處理邏輯,而Runnable 對象是作為線程構(gòu)造函數(shù)的參數(shù)。
2.繼承 Thread 類
線程實現(xiàn)類直接繼承 Thread ,本質(zhì)上也是實現(xiàn) Runnable 接口的 run 方法。
二、單線程抽象類
創(chuàng)建單線程的兩種方式都很簡單,但每次創(chuàng)建線程代碼顯得有點冗余,于是 RocketMQ 里實現(xiàn)了一個抽象類 ServiceThread 。
抽象類 ServiceThread
我們可以看到抽象類中包含了如下核心方法:
- 定義線程名;
- 啟動線程;
- 關(guān)閉線程。
下圖展示了 RocketMQ 眾多的單線程實現(xiàn)類。
實現(xiàn)類的編程模版類似 :
我們僅僅需要繼承抽象類,并實現(xiàn) getServiceName 和 run 方法即可。啟動的時候,調(diào)用 start 方法 , 關(guān)閉的時候調(diào)用 shutdown 方法。
三、線程池原理
線程池是一種基于池化思想管理線程的工具,線程池維護著多個線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。這避免了在處理短時間任務(wù)時創(chuàng)建與銷毀線程的代價。線程池不僅能夠保證內(nèi)核的充分利用,還能防止過分調(diào)度。
JDK中提供的 ThreadPoolExecutor 類,是我們最常使用的線程池類。
ThreadPoolExecutor構(gòu)造函數(shù)
參數(shù)名 | 作用 |
corePoolSize | 隊列沒滿時,線程最大并發(fā)數(shù) |
maximumPoolSizes | 隊列滿后線程能夠達到的最大并發(fā)數(shù) |
keepAliveTime | 空閑線程過多久被回收的時間限制 |
unit | keepAliveTime 的時間單位 |
workQueue | 阻塞的隊列類型 |
threadPoolFactory | 改變線程的名稱、線程組、優(yōu)先級、守護進程狀態(tài) |
RejectedExecutionHandler | 超出 maximumPoolSizes + workQueue 時,任務(wù)會交給RejectedExecutionHandler來處理 |
任務(wù)的調(diào)度通過執(zhí)行 execute方法完成,方法的核心流程如下:
- 如果 workerCount < corePoolSize,創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù)。
- 如果 workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊列未滿,則將任務(wù)添加到該阻塞隊列中。
- 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù)。
- 如果 workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認的處理方式是直接拋異常。
四、線程池封裝
在 RocketMQ 里 ,網(wǎng)絡(luò)請求都會攜帶命令編碼,每種命令映射對應(yīng)的處理器,而處理器又會注冊對應(yīng)的線程池。
當服務(wù)端 Broker 接收到發(fā)送消息命令時,都會有單獨的線程池 sendMessageExecutor 來處理這種命令請求。
基于 ThreadPoolExecutor 做了一個簡單的封裝 ,BrokerFixedThreadPoolExecutor 構(gòu)造函數(shù)包含六個核心參數(shù):
- 核心線程數(shù)和最大線程數(shù)相同 ,數(shù)量是:cpu核數(shù)和4比較后的最小值;
- 空閑線程的回收的時間限制,默認1分鐘;
- 發(fā)送消息隊列,有界隊列,默認10000;
- 線程工廠 ThreadFactoryImpl ,定義了線程名前綴:SendMessageThread_ 。
RocketMQ 實現(xiàn)了一個簡單的線程工廠:ThreadFactoryImpl,線程工廠可以定義線程名稱,以及是否是守護線程 。
線程工廠
開源項目 Cobar ,Xmemcached,Metamorphosis 中都有類似線程工廠的實現(xiàn) 。
五、線程名很重要
線程名很重要,線程名很重要,線程名很重要 ,重要的事情說三遍。
我們看到 RocketMQ 中,無論是單線程抽象類還是多線程的封裝都會配置線程名 ,因為通過線程名,非常容易定位問題,從而大大提升解決問題的效率。
定位的媒介常見有兩種:日志文件和堆棧記錄。
1.日志文件
經(jīng)常處理業(yè)務(wù)問題的同學(xué),一定都經(jīng)常與日志打交道。
查看 ERROR 日志,追溯到執(zhí)行線程, 要是線程池隔離做的好,基本可以判斷出哪種業(yè)務(wù)場景出了問題;
通過查看線程打印的日志,推斷線程調(diào)度是否正常,比如有的定時任務(wù)線程打印了開始,沒有打印結(jié)束,推論當前線程可能已經(jīng)掛掉或者阻塞。
2.堆棧記錄
jstack 是 java 虛擬機自帶的一種堆棧跟蹤工具 ,主要用來查看 Java 線程的調(diào)用堆棧,線程快照包含當前 java 虛擬機內(nèi)每一條線程正在執(zhí)行的方法堆棧的集合,可以用來分析線程問題。
jstack -l 進程pid
筆者查看線程堆棧,一般關(guān)注如下幾點:
當前 jvm 進程中的線程數(shù)量和線程分類是否在預(yù)期的范圍內(nèi);
系統(tǒng)接口超時或者定時任務(wù)停止的異常場景下 ,分析堆棧中是否有鎖未釋放,或者線程一直等待網(wǎng)絡(luò)通訊響應(yīng);
分析 jvm 進程中哪個線程占用的 CPU 最高。
六、總結(jié)
本文是RocketMQ 系列文章的開篇,和朋友們簡單聊聊 RocketMQ 源碼里創(chuàng)建線程的技巧。
單線程抽象類 ServiceThread使用者只需要實現(xiàn)業(yè)務(wù)邏輯以及定義線程名即可 ,不需要寫冗余的代碼。
線程池封裝適當封裝,定義線程工廠,并合理配置線程池參數(shù)。
線程名很重要文件日志,堆棧記錄配合線程名能大大提升解決問題的效率。
RocketMQ 的多線程編程技巧很多,比如線程通訊,并發(fā)控制,線程模型等等,后續(xù)的文章會一一為大家展現(xiàn)。