8000字 + 25圖探秘Xxl-Job核心架構原理
核心概念
這里還是老樣子,為了保證文章的完整性和連貫性,方便那些沒有使用過的小伙伴更加容易接受文章的內容,快速講一講Xxl-Job中的概念和使用
如果你已經使用過了,可直接跳過本節和下一節,快進到后面原理部分講解
1、調度中心
調度中心是一個單獨的Web服務,主要是用來觸發定時任務的執行
它提供了一些頁面操作,我們可以很方便地去管理這些定時任務的觸發邏輯
調度中心依賴數據庫,所以數據都是存在數據庫中的
調度中心也支持集群模式,但是它們所依賴的數據庫必須是同一個
所以同一個集群中的調度中心實例之間是沒有任何通信的,數據都是通過數據庫共享的
圖片
2、執行器
執行器是用來執行具體的任務邏輯的
執行器你可以理解為就是平時開發的服務,一個服務實例對應一個執行器實例
每個執行器有自己的名字,為了方便,你可以將執行器的名字設置成服務名
3、任務
任務什么意思就不用多說了
一個執行器中也是可以有多個任務的
總的來說,調用中心是用來控制定時任務的觸發邏輯,而執行器是具體執行任務的,這是一種任務和觸發邏輯分離的設計思想,這種方式的好處就是使任務更加靈活,可以隨時被調用,還可以被不同的調度規則觸發。
圖片
來個Demo
1、搭建調度中心
調度中心搭建很簡單,先下載源碼
然后改一下數據庫連接信息,執行一下在項目源碼中的/doc/db下的sql文件
圖片
啟動可以打成一個jar包,或者本地啟動就是可以的
啟動完成之后,訪問下面這個地址就可以訪問到控制臺頁面了
http://localhost:8080/xxl-job-admin/toLogin
用戶名密碼默認是 admin/123456
2、執行器和任務添加
添加一個名為sanyou-xxljob-demo執行器
圖片
任務添加
圖片
執行器選擇我們剛剛添加的,指定任務名稱為TestJob,corn表達式的意思是每秒執行一次
創建完之后需要啟動一下任務,默認是關閉狀態,也就不會執行
圖片
創建執行器和任務其實就是CRUD,并沒有復雜的業務邏輯
按照如上配置的整個Demo的意思就是
每隔1s,執行一次sanyou-xxljob-demo這個執行器中的TestJob任務
3、創建執行器和任務
引入依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
配置XxlJobSpringExecutor這個Bean
@Configuration
public class XxlJobConfiguration {
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
//設置調用中心的連接地址
xxlJobSpringExecutor.setAdminAddresses("http://localhost:8080/xxl-job-admin");
//設置執行器的名稱
xxlJobSpringExecutor.setAppname("sanyou-xxljob-demo");
//設置一個端口,后面會講作用
xxlJobSpringExecutor.setPort(9999);
//這個token是保證訪問安全的,默認是這個,當然可以自定義,
// 但需要保證調度中心配置的xxl.job.accessToken屬性跟這個token是一樣的
xxlJobSpringExecutor.setAccessToken("default_token");
//任務執行日志存放的目錄
xxlJobSpringExecutor.setLogPath("./");
return xxlJobSpringExecutor;
}
}
XxlJobSpringExecutor這個類的作用,后面會著重講
通過@XxlJob指定一個名為TestJob的任務,這個任務名需要跟前面頁面配置的對應上
@Component
public class TestJob {
private static final Logger logger = LoggerFactory.getLogger(TestJob.class);
@XxlJob("TestJob")
public void testJob() {
logger.info("TestJob任務執行了。。。");
}
}
所以如果順利的話,每隔1s鐘就會打印一句TestJob任務執行了。。。
啟動項目,注意修改一下端口,因為調用中心默認也是8080,本地起會端口沖突
最終執行結果如下,符合預期
圖片
講完概念和使用部分,接下來就來好好講一講Xxl-Job核心的實現原理
從執行器啟動說起
前面Demo中使用到了一個很重要的一個類
XxlJobSpringExecutor
這個類就是整個執行器啟動的入口
圖片
這個類實現了SmartInitializingSingleton接口
所以經過Bean的生命周期,一定會調用afterSingletonsInstantiated這個方法的實現
這個方法干了很多初始化的事,這里我挑三個重要的講,其余的等到具體的功能的時候再提
1、初始化JobHandler
JobHandler是個什么?
所謂的JobHandler其實就是一個定時任務的封裝
圖片
一個定時任務會對應一個JobHandler對象
當執行器執行任務的時候,就會調用JobHandler的execute方法
JobHandler有三種實現:
- MethodJobHandler
- GlueJobHandler
- ScriptJobHandler
MethodJobHandler是通過反射來調用方法執行任務
圖片
所以MethodJobHandler的任務的實現就是一個方法,剛好我們demo中的例子任務其實就是一個方法
所以Demo中的任務最終被封裝成一個MethodJobHandler
GlueJobHandler比較有意思,它支持動態修改任務執行的代碼
當你在創建任務的時候,需要指定運行模式為GLUE(Java)
圖片
之后需要在操作按鈕點擊GLUE IDE編寫Java代碼
圖片
代碼必須得實現IJobHandler接口,之后任務執行的時候就會執行execute方法的實現
如果你需要修改任務的邏輯,只需要重新編輯即可,不需要重啟服務
ScriptJobHandler,通過名字也可以看出,是專門處理一些腳本的
運行模式除了BEAN和GLUE(Java)之外,其余都是腳本模式
而本節的主旨,所謂的初始化JobHandler就是指,執行器啟動的時候會去Spring容器中找到加了@XxlJob注解的Bean
解析注解,然后封裝成一個MethodJobHandler對象,最終存到XxlJobSpringExecutor成員變量的一個本地的Map緩存中
圖片
緩存key就是任務的名字
圖片
至于GlueJobHandler和ScriptJobHandler都是任務觸發時才會創建
除了上面這幾種,你也自己實現JobHandler,手動注冊到JobHandler的緩存中,也是可以通過調度中心觸發的
2、創建一個Http服務器
除了初始化JobHandler之外,執行器還會創建一個Http服務器
這個服務器端口號就是通過XxlJobSpringExecutor配置的端口,demo中就是設置的是9999,底層是基于Netty實現的
圖片
這個Http服務端會接收來自調度中心的請求
當執行器接收到調度中心的請求時,會把請求交給ExecutorBizImpl來處理
圖片
這個類非常重要,所有調度中心的請求都是這里處理的
ExecutorBizImpl實現了ExecutorBiz接口
當你翻源碼的時候會發現,ExecutorBiz還有一個ExecutorBizClient實現
圖片
ExecutorBizClient的實現就是發送http請求,所以這個實現類是在調度中心使用的,用來訪問執行器提供的http接口
圖片
3、注冊到調度中心
當執行器啟動的時候,會啟動一個注冊線程,這個線程會往調度中心注冊當前執行器的信息,包括兩部分數據
- 執行器的名字,也就是設置的appname
- 執行器所在機器的ip和端口,這樣調度中心就可以訪問到這個執行器提供的Http接口
前面提到每個服務實例都會對應一個執行器實例,所以調用中心會保存每個執行器實例的地址
圖片
這里你可以把調度中心的功能類比成注冊中心
任務觸發原理
弄明白執行器啟動時干了哪些事,接下來講一講Xxl-Job最最核心的功能,那就是任務觸發的原理
任務觸發原理我會分下面5個小點來講解
- 任務如何觸發?
- 快慢線程池的異步觸發任務優化
- 如何選擇執行器實例?
- 執行器如何去執行任務?
- 任務執行結果的回調
1、任務如何觸發?
調度中心在啟動的時候,會開啟一個線程,這個線程的作用就是來計算任務觸發時機,這里我把這個線程稱為調度線程
這個調度線程會去查詢xxl_job_info這張表
這張表存了任務的一些基本信息和任務下一次執行的時間
調度線程會去查詢下一次執行的時間 <= 當前時間 + 5s的任務
這個5s是XxlJob寫死的,被稱為預讀時間,提前讀出來,保證任務能準時觸發
舉個例子,假設當前時間是2023-11-29 08:00:10,這里的查詢就會查出下一次任務執行時間在2023-11-29 08:00:15之前執行的任務
圖片
查詢到任務之后,調度線程會去將這些任務根據執行時間劃分為三個部分:
- 當前時間已經超過任務下一次執行時間5s以上,也就是需要在2023-11-29 08:00:05(不包括05s)之前的執行的任務
- 當前時間已經超過任務下一次執行時間,但是但不足5s,也就是在2023-11-29 08:00:05和2023-11-29 08:00:10(不包括10s)之間執行的任務
- 還未到觸發時間,但是一定是5s內就會觸發執行的
圖片
對于第一部分的已經超過5s以上時間的任務,會根據任務配置的調度過期策略來選擇要不要執行
圖片
調度過期策略就兩種,就是字面意思
- 直接忽略這個已經過期的任務
- 立馬執行一次這個過期的任務
對于第二部分的超時時間在5s以內的任務,就直接立馬執行一次,之后如果判斷任務下一次執行時間就在5s內,會直接放到一個時間輪里面,等待下一次觸發執行
對于第三部分任務,由于還沒到執行時間,所以不會立馬執行,也是直接放到時間輪里面,等待觸發執行
當這批任務處理完成之后,不論是前面是什么情況,調度線程都會去重新計算每個任務的下一次觸發時間,然后更新xxl_job_info這張表的下一次執行時間
到此,一次調度的計算就算完成了
之后調度線程還會繼續重復上面的步驟,查任務,調度任務,更新任務下次執行時間,一直死循環下去,這就實現了任務到了執行時間就會觸發的功能
這里在任務觸發的時候還有一個很有意思的細節
由于調度中心可以是集群的形式,每個調度中心實例都有調度線程,那么如何保證任務在同一時間只會被其中的一個調度中心觸發一次?
我猜你第一時間肯定想到分布式鎖,但是怎么加呢?
XxlJob實現就比較有意思了,它是基于八股文中常說的通過數據庫來實現的分布式鎖的
在調度之前,調度線程會嘗試執行下面這句sql
圖片
就是這個sql
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
一旦執行成功,說明當前調度中心成功搶到了鎖,接下來就可以執行調度任務了
當調度任務執行完之后再去關閉連接,從而釋放鎖
由于每次執行之前都需要去獲取鎖,這樣就保證在調度中心集群中,同時只有一個調度中心執行調度任務
最后畫一張圖來總結一下這一小節
圖片
2、快慢線程池的異步觸發任務優化
當任務達到了觸發條件,并不是由調度線程直接去觸發執行器的任務執行
調度線程會將這個觸發的任務交給線程池去執行
所以上圖中的最后一部分觸發任務執行其實是線程池異步去執行的
那么,為什么要使用線程池異步呢?
主要是因為觸發任務,需要通過Http接口調用具體的執行器實例去觸發任務
圖片
這一過程必然會耗費時間,如果調度線程去做,就會耽誤調度的效率
所以就通過異步線程去做,調度線程只負責判斷任務是否需要執行
并且,Xxl-Job為了進一步優化任務的觸發,將這個觸發任務執行的線程池劃分成快線程池和慢線程池兩個線程池
圖片
在調用執行器的Http接口觸發任務執行的時候,Xxl-Job會去記錄每個任務的觸發所耗費的時間
注意并不是任務執行時間,只是整個Http請求耗時時間,這是因為執行器執行任務是異步執行的,所以整個時間不包括任務執行時間,這個后面會詳細說
當任務一次觸發的時間超過500ms,那么這個任務的慢次數就會加1
如果這個任務一分鐘內觸發的慢次數超過10次,接下來就會將觸發任務交給慢線程池去執行
所以快慢線程池就是避免那種頻繁觸發并且每次觸發時間還很長的任務阻塞其它任務的觸發的情況發生
3、如何選擇執行器實例?
上一節說到,當任務需要觸發的時候,調度中心會向執行器發送Http請求,執行器去執行具體的任務
那么問題來了
由于一個執行器會有很多實例,那么應該向哪個實例請求?
這其實就跟任務配置時設置的路由策略有關了
圖片
從圖上可以看出xxljob支持多種路由策略
除了分片廣播,其余的具體的算法實現都是通過ExecutorRouter的實現類來實現的
圖片
這里簡單講一講各種算法的原理,有興趣的小伙伴可以去看看內部的實現細節
第一個、最后一個、輪詢、隨機都很簡單,沒什么好說的
一致性Hash講起來比較復雜,你可以先看看這篇文章,再去查看Xxl-Job的代碼實現
最不經常使用(LFU:Least Frequently Used):Xxl-Job內部會有一個緩存,統計每個任務每個地址的使用次數,每次都選擇使用次數最少的地址,這個緩存每隔24小時重置一次
最近最久未使用(LRU:Least Recently Used):將地址存到LinkedHashMap中,它利用LinkedHashMap可以根據元素訪問(get/put)順序來給元素排序的特性,快速找到最近最久未使用(未訪問)的節點
故障轉移:調度中心都會去請求每個執行器,只要能接收到響應,說明執行器正常,那么任務就會交給這個執行器去執行
忙碌轉移:調度中心也會去請求每個執行器,判斷執行器是不是正在執行當前需要執行的任務(任務執行時間過長,導致上一次任務還沒執行完,下一次又觸發了),如果在執行,說明忙碌,不能用,否則就可以用
分片廣播:XxlJob給每個執行器分配一個編號,從0開始遞增,然后向所有執行器觸發任務,告訴每個執行器自己的編號和總共執行器的數據
我們可以通過XxlJobHelper#getShardIndex獲取到編號,XxlJobHelper#getShardTotal獲取到執行器的總數據量
分片廣播就是將任務量分散到各個執行器,每個執行器只執行一部分任務,加快任務的處理
舉個例子,比如你現在需要處理30w條數據,有3個執行器,此時使用分片廣播,那么此時可將任務分成3分,每份10w條數據,執行器根據自己的編號選擇對應的那份10w數據處理
圖片
當選擇好了具體的執行器實例之后,調用中心就會攜帶一些觸發的參數,發送Http請求,觸發任務
4、執行器如何去執行任務?
相信你一定記得我前面在說執行器啟動是會創建一個Http服務器的時候提到這么一句
當執行器接收到調度中心的請求時,會把請求交給ExecutorBizImpl來處理
所以前面提到的故障轉移和忙碌轉移請求執行器進行判斷,最終執行器也是交給ExecutorBizImpl處理的
執行器處理觸發請求是這個ExecutorBizImpl的run方法實現的
圖片
當執行器接收到請求,在正常情況下,執行器會去為這個任務創建一個單獨的線程,這個線程被稱為JobThread
每個任務在觸發的時候都有單獨的線程去執行,保證不同的任務執行互不影響
之后任務并不是直接交給線程處理的,而是直接放到一個內存隊列中,線程直接從隊列中獲取任務
圖片
這里我相信你一定有個疑惑
為什么不直接處理,而是交給隊列,從隊列中獲取任務呢?
那就得講講不正常的情況了
如果調度中心選擇的執行器實例正在處理定時任務,那么此時該怎么處理呢?**
這時就跟阻塞處理策略有關了
圖片
阻塞處理策略總共有三種:
- 單機串行
- 丟棄后續調度
- 覆蓋之前調度
單機串行的實現就是將任務放到隊列中,由于隊列是先進先出的,所以就實現串行,這也是為什么放在隊列的原因
丟棄調度的實現就是執行器什么事都不用干就可以了,自然而然任務就丟了
覆蓋之前調度的實現就很暴力了,他是直接重新創建一個JobThread來執行任務,并且嘗試打斷之前的正在處理任務的JobThread,丟棄之前隊列中的任務
打斷是通過Thread#interrupt方法實現的,所以正在處理的任務還是有可能繼續運行,并不是說一打斷正在運行的任務就終止了
這里需要注意的一點就是,阻塞處理策略是對于單個執行器上的任務來生效的,不同執行器實例上的同一個任務是互不影響的
比如說,有一個任務有兩個執行器A和B,路由策略是輪詢
任務第一次觸發的時候選擇了執行器實例A,由于任務執行時間長,任務第二次觸發的時候,執行器的路由到了B,此時A的任務還在執行,但是B感知不到A的任務在執行,所以此時B就直接執行了任務
所以此時你配置的什么阻塞處理策略就沒什么用了
如果業務中需要保證定時任務同一時間只有一個能運行,需要把任務路由到同一個執行器上,比如路由策略就選擇第一個
5、任務執行結果的回調
當任務處理完成之后,執行器會將任務執行的結果發送給調度中心
圖片
如上圖所示,這整個過程也是異步化的
- JobThread會將任務執行的結果發送到一個內存隊列中
- 執行器啟動的時候會開啟一個處發送任務執行結果的線程:TriggerCallbackThread
- 這個線程會不停地從隊列中獲取所有的執行結果,將執行結果批量發送給調度中心
- 調用中心接收到請求時,會根據執行的結果修改這次任務的執行狀態和進行一些后續的事,比如失敗了是否需要重試,是否有子任務需要觸發等等
到此,一次任務的就算真正處理完成了
最后
最后我從官網撈了一張Xxl-Job架構圖
圖片
奈何作者不更新吶,導致這個圖稍微有點老了,有點跟現有的架構對不上
比如說圖中的自研RPC(xxl-rpc)部分已經替換成了Http協議,這主要是擁抱生態,方便跨語言接入
但是不要緊,大體還是符合現在的整個的架構
從架構圖中也可以看出來,本文除了日志部分的內容沒有提到,其它的整個核心邏輯基本上都講到了
而日志部分其實是個輔助的作用,讓你更方便查看任務的運行情況,對任務的觸發邏輯是沒有影響的,所以就沒講了
所以從本文的講解再到官方架構圖,你會發現整個Xxl-Job不論是使用還是實現都是比較簡單的,非常的輕量級