揭秘XXLJOB:任務調度和執行的全面指南
本文主要向大家介紹一下xxljob在調度任務時執行了哪些操作,這也是xxljob最核心的功能
表結構
xxljob是如何觸發任務的,首先我們先了解一下xxljob的表結構
xxl_job_info 記錄的是各個具體job的信息 是xxljob中最重要的表 這張表記錄的job 的調度類型,調度時機,路由策略,阻塞策略等信息
CREATE TABLE `xxl_job_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '執行器主鍵ID',
`job_desc` varchar(255) NOT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT '作者',
`alarm_email` varchar(255) DEFAULT NULL COMMENT '報警郵件',
`schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '調度類型',
`schedule_conf` varchar(128) DEFAULT NULL COMMENT '調度配置,值含義取決于調度類型',
`misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '調度過期策略',
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '執行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '執行器任務handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '執行器任務參數',
`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞處理策略',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任務執行超時時間,單位秒',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失敗重試次數',
`glue_type` varchar(50) NOT NULL COMMENT 'GLUE類型',
`glue_source` mediumtext COMMENT 'GLUE源代碼',
`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE備注',
`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新時間',
`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任務ID,多個逗號分隔',
`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '調度狀態:0-停止,1-運行',
`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次調度時間',
`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次調度時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
xxl_job_log 這張表記錄了job執行時的一些參數 包括了執行次數 執行參數 jobid, 調度的時機和結果 xxl_job_log_report 記錄了調度中心的統計數據 xxl_job_logglue記錄了glue類型job的日志
xxl_job_registry 記錄了在調度中心注冊的執行器信息 xxl_job_group 記錄了調度中心的分組情況 xxl_job_user 記錄了用戶信息 xxl_job_lock記錄了鎖信息 其實我們通過表的設計就以了解一些xxljob的設計思想,下面我們就來看一下一個任務是如何被調度的
架構簡述
圖片
- 執行器注冊到調度中心
- 頁面新建/修改job ->根據執行策略計算 下次執行時間
- 執行線程輪詢表 5s 執行job -> 阻塞策略
- 記錄執行信息 ->更新下次執行
大家可以看到上圖是一個簡要的job執行的流程圖 在了解代碼前和大家簡要的介紹一下XXLJOB的通信結構,調度中心和執行器各維護一個netty的服務,雙方使用HTTP通信,執行器的ip和端口在啟動時會通過http調用通知調度中心,各位在啟動服務時是可以看到XXLJOB對應的日志的,在看XXLJOB的代碼時大家會看到一些以client或impl結尾的實現,對應的就是調度中心和執行器的netty的實現。
圖片
下面筆者帶大家深入了解一下具體的代碼 在執行器端需配置一些參數,從而時執行器能夠找到調度中心
圖片
包括了調度中心的地址,執行器的ip,端口(可以不填),日志的過期時間(非必填)
代碼詳解
- 啟動/初始化階段 在服務啟動時會啟動netty的服務
XxlJobSpringExecutor#afterSingletonsInstantiated()
->super.start()
->initEmbedServer();
->embedServer.start();
->startRegistry();
->ExecutorRegistryThread.getInstance().start()
最終啟動一個名為registryThread的線程,在這個線程中調用 adminBiz.registry()方法,調用調度中心的接口實現注冊 此時調度中心與執行器(客戶端)就建立起了連接 此外在spring的bean初始化完成后 在XxlJobAdminConfig這個類中還利用afterPropertiesSet實現了初始化了一些線程
圖片
這些線程的作用分別為處理國際化對應的語言,快慢線程池的初始化,服務注冊監控,任務執行監控,任務完成監控,日志報表處理 ,任務執行,限于篇幅,這些線程的作用我們下期再詳細的解釋,到現在為止,XXLJOB的啟動就已經完成了。
- 頁面處理
圖片
在調度中心的界面我們可以新建一個調度任務,其中有幾個參數需要注意,jobhandler,運行模式,阻塞策略,cron。由于目前大部分是在springboot中運行的,所以大部分是依賴Bean執行的,jobHandler則是代碼中定義的job名稱。運行模式決定了調度中心選擇執行器的邏輯
圖片
阻塞策略決定了連續執行時的策略
圖片
cron則決定了調度的時間
- 執行
下面我們深入了解一下一個job是如果被調度中心執行的。執行的邏輯在上文中提到的JobScheduleHelper中實現。首先會先獲取鎖,此處是利用mysql的for update實現的, 然后會獲取5秒內的任務列表
圖片
逐個遍歷拿到的任務,如果任務已經超過了執行時間+預讀時間(5s),根據過期策略決定是丟棄還是立即執行一次,并且更新下次執行時間 如果當前時間大于任務下次執行時間,則將任務放入執行線程池,等待執行,更新下次執行時間,如果下次執行時間在5s內,則會將任務放入時間輪中,等待其他線程處理,再此更新此任務執行時間如果均不是上述兩種情況,則會將任務放入時間輪中,更新執行時間,等待其他線程處理。
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));\
}
}
具體執行任務的方法是 com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger
在這個方法中會根據執行策略(ExecutorRouteStrategyEnum)選擇合適的執行器,找到對應的執行器,利用netty的接口執行對應的任務
尾聲
在本期文章中,向大家介紹了XXLJOB是如何啟動和執行任務的,在下期文章中將向大家介紹XXLJOB其他的細節,由于一個定時任務系統的復雜,很多內容還是需要各位讀者自己看一遍才能真正的理解,畢竟代碼才是真正的老師。