搞懂Spring任務(wù)執(zhí)行器和調(diào)度器模型
在日常開(kāi)發(fā)過(guò)程中,如果想要處理長(zhǎng)時(shí)間運(yùn)行的任務(wù),對(duì)于任何應(yīng)用程序開(kāi)發(fā)而言都不是一件容易的事情。有時(shí)候,需要異步執(zhí)行任務(wù)或在特定延遲之后執(zhí)行任務(wù),這可以通過(guò) Spring 的任務(wù)執(zhí)行和任務(wù)調(diào)度來(lái)完成。Spring 框架通過(guò) TaskExecutor 和 TaskScheduler 這兩個(gè)接口引入了對(duì)異步執(zhí)行和任務(wù)調(diào)度的抽象。讓我們一起來(lái)看一下。
Spring 任務(wù)執(zhí)行器
在介紹 Spring 任務(wù)執(zhí)行器之前,要先引出 JDK 中的一個(gè)基礎(chǔ)并發(fā)編程組件,即 Executor。所謂的 Executor,本質(zhì)上是在所有內(nèi)部線程任務(wù)執(zhí)行過(guò)程上提供了一個(gè)抽象層,并管理線程的整個(gè)并發(fā)執(zhí)行流。Executor 是執(zhí)行任務(wù)的入口, JDK 為 Executor 提供了以下三個(gè)基本接口,即 Executor、ExecutorService 和 ScheduledExecutorService。
圖片
Spring 提供 TaskExecutor 接口作為 Executor 的抽象。TaskExecutor 的實(shí)現(xiàn)類有很多,它們提供了針對(duì)異步執(zhí)行過(guò)程的各種支持。
圖片
接下來(lái),以最基礎(chǔ)的 SimpleAsyncTaskExecutor為例,來(lái)看看在 Spring 應(yīng)用程序中執(zhí)行任務(wù)的具體實(shí)現(xiàn)過(guò)程。
TaskExecutor 應(yīng)用方式
SimpleAsyncTaskExecutor 為每個(gè)任務(wù)創(chuàng)建一個(gè)新線程,并以異步方式運(yùn)行,它實(shí)現(xiàn)了 AsyncTaskExecutor 接口。可以通過(guò)以下方式在 Spring 應(yīng)用程序中注入一個(gè) SimpleAsyncTaskExecutor。
@Bean
AsyncTaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
return taskExecutor;
}
然后,可以構(gòu)建一個(gè) AsyncTask,并通過(guò) SimpleAsyncTaskExecutor 來(lái)異步執(zhí)行任務(wù)。
public class AsyncTask {
@Autowired
private AsyncTaskExecutor executor;
public void runTasks() throws Exception {
for (int i = 1; i <= 5; i++) {
Runnable task = new SpringTask(" " + i);
executor.execute(task);
}
}
}
這里創(chuàng)建了一個(gè)自定義的 SpringTask,這是 JDK 中 Runnable 接口的實(shí)現(xiàn)類。
public class SpringTask implements Runnable {
privatestaticfinal Logger LOGGER = Logger.getLogger(Task.class);
private String taskNumber;
public SpringTask(String taskNumber) {
this.taskNumber = taskNumber;
}
@Override
public void run() {
LOGGER.info(Thread.currentThread().getName() + ", Execute Task = " + taskNumber);
taskProcess();
LOGGER.info(Thread.currentThread().getName() + ", End");
}
private void taskProcess() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
這里通過(guò)讓執(zhí)行線程睡眠 2 秒鐘的方式來(lái)模擬任務(wù)的執(zhí)行時(shí)間,并把執(zhí)行的過(guò)程通過(guò)日志的方式打印出來(lái)。執(zhí)行這段代碼,會(huì)在控制臺(tái)中得到如下輸出:
INFO SpringTask:15 - SimpleAsyncTaskExecutor-3, Execute Task = 3
INFO SpringTask:15 - SimpleAsyncTaskExecutor-1, Execute Task = 1
INFO SpringTask:15 - SimpleAsyncTaskExecutor-2, Execute Task = 2
INFO SpringTask:15 - SimpleAsyncTaskExecutor-5, Execute Task = 5
INFO SpringTask:15 - SimpleAsyncTaskExecutor-4, Execute Task = 4
INFO SpringTask:17 - SimpleAsyncTaskExecutor-2, End
INFO SpringTask:17 - SimpleAsyncTaskExecutor-4, End
INFO SpringTask:17 - SimpleAsyncTaskExecutor-3, End
INFO SpringTask:17 - SimpleAsyncTaskExecutor-5, End
INFO SpringTask:17 - SimpleAsyncTaskExecutor-1, End
顯然,基于 SimpleAsyncTaskExecutor,任務(wù)與線程之間應(yīng)該是一對(duì)一的執(zhí)行關(guān)系。
TaskExecutor 運(yùn)行原理
介紹完 Spring 任務(wù)執(zhí)行器的使用方式之后,來(lái)進(jìn)一步分析它的實(shí)現(xiàn)原理。同樣,還是以前面介紹的 SimpleAsyncTaskExecutor 為例展開(kāi)討論。SimpleAsyncTaskExecutor 的類層結(jié)構(gòu)如圖所示:
圖片
這里的 TaskExecutor 和 AsyncTaskExecutor 都比較好理解,直接看它們的接口定義,如下所示:
public interface TaskExecutor extends Executor {
@Override
void execute(Runnable task);
}
public interface AsyncTaskExecutor extends TaskExecutor {
void execute(Runnable task, long startTimeout);
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task);
}
AsyncListenableTaskExecutor 又?jǐn)U展了 AsyncTaskExecutor,添加了可以返回 ListenableFuture 的方法,ListenableFuture 是 JDK 中 Future 接口的子接口,可用于在任務(wù)提交后添加回調(diào)。
接下來(lái),來(lái)看 AsyncListenableTaskExecutor 中的 execute 方法,如下所示:
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
//使用包裝器包裝任務(wù)
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
//啟用限流器來(lái)執(zhí)行任務(wù)
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
//不啟用限流器來(lái)執(zhí)行任務(wù)
else {
doExecute(taskToUse);
}
}
這里引出了限流器的概念,限流器的作用是線程執(zhí)行的并發(fā)度達(dá)到閾值則會(huì)讓后續(xù)的線程處于阻塞等待。這是 Spring TaskExecutor 設(shè)計(jì)上的一個(gè)亮點(diǎn),基本思想如圖所示:
圖片
結(jié)合圖示,不難看出限流器是在線程執(zhí)行之前進(jìn)行并發(fā)限制的判斷,如果需要限流就阻塞線程。而如果任務(wù)執(zhí)行完成后,那就喚醒正在等待的線程繼續(xù)執(zhí)行任務(wù)。
而真正執(zhí)行任務(wù)的 doExecute() 方法比較簡(jiǎn)單,單獨(dú)從線程工廠 ThreadFactory 獲取線程,或者直接創(chuàng)建一個(gè)新的線程進(jìn)行執(zhí)行即可,如下所示:
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
Spring 任務(wù)調(diào)度器
介紹完 Spring 任務(wù)執(zhí)行器,接下來(lái)討論 Spring 任務(wù)調(diào)度器。有時(shí),我們需要以固定的時(shí)間間隔執(zhí)行任務(wù),就可以通過(guò)任務(wù)調(diào)度器來(lái)實(shí)現(xiàn)。基于 Spring,將看到如何使用一些注解來(lái)對(duì)任務(wù)進(jìn)行調(diào)度。
TaskScheduler 應(yīng)用方式
在 Spring 中,可以借助@EnableScheduling 注解來(lái)啟用任務(wù)調(diào)度。
@Configuration
@EnableScheduling
public class SpringSchedulingExample
一旦啟用了任務(wù)調(diào)度,Spring 將自動(dòng)注冊(cè)一個(gè)內(nèi)部 BeanPostProcessor,它將在 Spring 管理的 Bean 上找到添加了@Scheduled 注解的方法。@Scheduled 注解的使用方法如下所示:
@Scheduled(fixedDelay = 2000)
public void scheduledTask() {
LOGGER.info("Execute task " + new Date());
}
在這里,使用@Scheduled 注解為 scheduledTask() 設(shè)置了調(diào)度任務(wù),即通過(guò) fixedDelay 屬性每?jī)擅雸?zhí)行一次該方法。執(zhí)行該方法,可以在控制臺(tái)上看到如下信息:
INFO SpringSchedulingExample:17 - Execute task Sat May 01 20:06:11 CST 2021
INFO SpringSchedulingExample:17 - Execute task Sat May 01 20:06:13 CST 2021
INFO SpringSchedulingExample:17 - Execute task Sat May 01 20:06:15 CST 2021
INFO SpringSchedulingExample:17 - Execute task Sat May 01 20:06:17 CST 2021
INFO SpringSchedulingExample:17 - Execute task Sat May 01 20:06:19 CST 2021
當(dāng)然,還可以使用其他調(diào)度屬性,@Scheduled 注解的完整定義如下所示:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
//cron 表達(dá)式
String cron() default "";
//由 cron 表達(dá)式進(jìn)行解析的時(shí)間區(qū)域
String zone() default "";
//固定延遲時(shí)間
long fixedDelay() default -1;
//字符串形式的固定延遲時(shí)間
String fixedDelayString() default "";
//固定周期
long fixedRate() default -1;
//字符串形式的固定周期
String fixedRateString() default "";
//初始延遲時(shí)間
long initialDelay() default -1;
//字符串形式的初始延遲時(shí)間
String initialDelayString() default "";
}
這些屬性都很簡(jiǎn)單,唯一需要說(shuō)明一下的是 fixedRate。舉個(gè)例子,如果在某個(gè)方法上設(shè)置 fiexdRate=3000,而執(zhí)行該方法所花的時(shí)間是一秒,那么兩秒后就會(huì)再次執(zhí)行該方法。
TaskScheduler 運(yùn)行原理
在 Spring 的 TaskScheduler 出現(xiàn)之前,可以使用 JDK 中的 Timer 或第三方的 Quartz 組件類實(shí)現(xiàn)調(diào)度功能。而 TaskScheduler 的核心優(yōu)勢(shì)就是為開(kāi)發(fā)人員提供了一種抽象,使得執(zhí)行定時(shí)任務(wù)的代碼不需要指定特定的定時(shí)框架。TaskScheduler 接口的定義如下所示:
public interface TaskScheduler {
ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
ScheduledFuture<?> schedule(Runnable task, Date startTime);
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);
}
TaskScheduler 中各個(gè)方法的含義可以結(jié)合@Scheduled 注解的說(shuō)明進(jìn)行理解,而這些方法都返回了一個(gè) JDK 中的 ScheduledFuture 對(duì)象,該對(duì)象是對(duì) Future 的擴(kuò)展。
在 Spring 中,TaskScheduler 接口的代表性實(shí)現(xiàn)就是 ThreadPoolTaskScheduler,其類層結(jié)構(gòu)如圖所示:
圖片
翻閱 ThreadPoolTaskScheduler 類的源代碼,發(fā)現(xiàn)在該類中存在如下所示的一個(gè)變量定義。
private ScheduledExecutorService scheduledExecutor;
在前面介紹 JDK Executor 時(shí),已經(jīng)引出了 ScheduledExecutorService,它為開(kāi)發(fā)人員提供了各種調(diào)度方法。所以,看到這里,不難理解,ThreadPoolTaskScheduler 實(shí)際上就是將各種調(diào)度操作委托給了這個(gè) ScheduledExecutorService。通過(guò)如下所示的 schedule 方法實(shí)現(xiàn)過(guò)程印證了這一點(diǎn)。
@Override
public ScheduledFuture<?> schedule(Runnable task, Date startTime) {
ScheduledExecutorService executor = getScheduledExecutor();
long initialDelay = startTime.getTime() - System.currentTimeMillis();
try {
return executor.schedule(errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
可以看到,這里首先獲取一個(gè) ScheduledExecutorService,然后通過(guò)它的 schedule 方法完成調(diào)度。而 ScheduledExecutorService 的創(chuàng)建過(guò)程也很簡(jiǎn)單,如下所示:
protected ScheduledExecutorService createExecutor(
int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
}
這樣,就把 JDK 中的 ScheduledExecutorService 和 Spring 中的 TaskScheduler 關(guān)聯(lián)了起來(lái),從而完成了對(duì)任務(wù)調(diào)度過(guò)程的剖析。
總結(jié)
針對(duì)并發(fā)編程,我們可以使用 JDK 所提供的 Thread 類和 Runnable 接口來(lái)創(chuàng)建和管理多線程。但由于這些技術(shù)組件過(guò)于底層,所以在日常開(kāi)發(fā)過(guò)程中,我一般不推薦你使用它們來(lái)創(chuàng)建多線程應(yīng)用程序。
而 JDK 并發(fā)包中的 Executor 等組件雖然經(jīng)過(guò)高度抽象,為開(kāi)發(fā)人員提供了高層次的 API,但由于并發(fā)編程涉及到多線程之間的協(xié)作和交互,合理使用這些組件對(duì)開(kāi)發(fā)人員的要求也很高。
Spring 框架充分考慮到了這些問(wèn)題,并結(jié)合常見(jiàn)的應(yīng)用場(chǎng)景提供了任務(wù)執(zhí)行器和任務(wù)調(diào)度器組件。在今天的內(nèi)容中,我們從應(yīng)用方法和 運(yùn)行原理這兩個(gè)維度對(duì)這兩款技術(shù)組件進(jìn)行了詳細(xì)地分析,幫助你在開(kāi)發(fā)過(guò)程中能夠更好地實(shí)現(xiàn)各種并發(fā)編程需求。
圖片