ExecutorCompletionService詳解,你學會了嗎?
本文將深入講解 ExecutorCompletionService 的使用以及源碼解析。
ExecutorCompletionService適用場景
ExecutorCompletionService在以下場景中特別有用:
- 并行任務處理:當需要同時執行多個任務,并按照完成的順序獲取它們的結果時,可以使用ExecutorCompletionService來簡化任務提交和結果獲取的流程。
- 高性能計算:在需要進行大規模計算或復雜計算的場景中,可以將任務拆分成多個子任務,并使用ExecutorCompletionService來管理和獲取子任務的結果。
假設現在有一批需要進行計算的任務,為了提高整批任務的執行效率,我們可以使用線程池來異步計算這些任務。通過向線程池中不斷提交任務并保留與每個任務關聯的Future對象。最后,我們可以遍歷這些Future對象,并通過調用 get() 方法獲取每個任務的計算結果。
Future的不足
Future 沒有辦法回調,只能手動去調用,當通過 get() 方法獲取線程的返回值時,會導致阻塞,也就是和當前這個 Future 關聯的計算任務執行完成的時候才返回結果,新任務必須等待已完成任務的結果才能繼續進行處理。
這樣會浪費很多時間,因為我們不知道哪個線程先執行完了,只能挨個去獲取結果,這樣已經完成的線程會因為前面未完成的線程的耗時而無法提前進行匯總,最好是誰先執行完成,誰先返回。
而 ExecutorCompletionService 可以實現這樣的效果,節省獲取完成結果的時間,它的內部有一個先進先出的阻塞隊列,用于保存已經執行完成的 Future,通過調用它的 take() 方法或 poll() 方法可以獲取到一個已經執行完成的 Future,進而通過調用 Future 接口實現類的 get() 方法獲取最終的結果。
CompletionService的目標是任務誰先完成誰先獲取,即結果按照完成先后順序排序
ExecutorCompletionService使用
ExecutorCompletionService 提供了一種方便的方式來處理一組異步任務,并按照完成的順序獲取它們的結果。它內部使用了Executor框架來執行任務,并且內部管理著一個已完成任務的阻塞隊列,在結果獲取上提供了更加靈活和高效的機制。
下面是一個簡單的例子來演示ExecutorCompletionService的基本使用:
public class ExecutorCompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// 提交任務
for (int i = 0; i < 10; i++) {
final int taskId = i;
completionService.submit(() -> {
double sleepTime = Math.random() * 1000;
Thread.sleep((long) sleepTime); // 模擬耗時操作
return "Task " + taskId + " completed,cost time: " + sleepTime;
});
}
// 獲取結果
for (int i = 0; i < 10; i++) {
Future<String> future = completionService.take();
String result = future.get();
System.out.println(result);
}
executor.shutdown();
}
}
輸出:
Task 2 completed,cost time: 170.01927312611775
Task 3 completed,cost time: 460.9622858036789
Task 1 completed,cost time: 563.24738180643
Task 0 completed,cost time: 595.938819219159
Task 5 completed,cost time: 480.4473056068137
Task 4 completed,cost time: 748.2343208613524
Task 6 completed,cost time: 370.4679098376097
Task 7 completed,cost time: 270.45945981324905
Task 9 completed,cost time: 336.5536570760892
Task 8 completed,cost time: 577.5774464801026
在上述代碼中,我們創建了一個固定大小的線程池,并使用 ExecutorCompletionService 來提交和獲取任務的結果。通過調用completionService.submit()方法來提交任務,并隨機指定睡眠時間,來模擬任務執行的耗時,然后通過completionService.take()方法來獲取已完成的任務結果。
可以看到是按照任務的執行耗時順序去獲取結果的。
ExecutorCompletionService原理解析
ExecutorCompletionService 提供了兩個構造函數,一個可以指定阻塞隊列,另一個使用內部默認的阻塞隊列,兩個構造函數都需要傳進線程池參數。
圖片
提供了三個獲取方法,可以看到都是從隊列中獲取。
- take()/poll() 方法的工作都委托給內部的已完成任務隊列 completionQueue。
- 如果隊列中有已完成的任務, take() 方法就返回任務的結果,否則阻塞等待任務完成。
- poll() 與 take() 方法不同,poll() 有兩個版本:
無參的 poll() 方法:如果完成隊列中有數據就返回,否則返回null。
有參數的 poll() 方法:如果完成隊列中有數據就直接返回,否則等待指定的時間,到時間后如果還是沒有數據就返回null。
圖片
兩個提交任務方法,可以看到 submit() 方法最終會委托給內部的 executor 去執行任務,提交任務的時候會將任務封裝成 QueueingFuture 對象。
圖片
ExecutorCompletionService內部維護了 QueueingFuture 類,QueueingFuture 繼承了 FutureTask,并重寫了 done() 方法,
可以看到 done() 方法在任務完成的時候會將結果存進 已完成任務隊列 completionQueue 中。
圖片
Futuretask 的 done() 方法是用來標記一個任務已經完成的方法。當一個 Futuretask 中的任務完成后,就會調用 done() 方法通知。
圖片
默認是空方法,不會執行任何動作。
圖片
執行流程
當我們使用ExecutorCompletionService類時,它能夠按照任務完成的順序獲取它們的結果,這是因為ExecutorCompletionService類內部結合了QueueingFuture類和done()方法的機制。以下是源碼流程步驟解釋:
- 提交任務:
我們通過submit方法將任務提交給ExecutorCompletionService。在提交任務時,ExecutorCompletionService會使用自定義的QueueingFuture類來包裝任務,并將其交給底層線程池執行。
- QueueingFuture類:
QueueingFuture類是ExecutorCompletionService的內部類,繼承自FutureTask。它的構造方法接收一個Callable對象作為參數。
在QueueingFuture類中,它重寫了done()方法。done()方法會在任務執行完成后被調用。
任務執行完成時的處理:
當任務執行完成后,在底層線程池的Worker線程中,會調用QueueingFuture的done()方法。
在done()方法中,QueueingFuture會首先調用父類FutureTask的done()方法,以觸發對計算結果的獲取。然后,它會將任務的結果存儲到一個內部的BlockingQueue隊列中(即completionQueue)。
獲取任務結果:
當我們調用take方法獲取任務結果時,它會從completionQueue隊列中取出已完成的任務結果,并返回該結果。如果隊列為空,則會阻塞等待,直到有任務完成并返回結果。
take方法內部會調用QueueingFuture的get()方法,從而觸發對應任務的計算結果的獲取。
保證按順序獲取結果:
由于completionQueue是一個阻塞隊列,并且在done()方法中將任務結果按照完成的順序放入隊列中,因此我們可以通過按順序獲取隊列中的任務結果,來保證按照任務完成的順序獲取它們的結果。
通過以上源碼流程步驟,ExecutorCompletionService類能夠按照任務完成的順序獲取結果。它利用QueueingFuture類包裝任務并存儲結果到阻塞隊列中,在任務執行完成后,按照完成的順序將結果放入隊列,從而實現了按順序獲取結果的功能。
注意事項
在使用ExecutorCompletionService時,需要注意以下事項:
- 合理選擇線程池大小:根據任務的數量和復雜性,合理選擇線程池的大小,以充分利用系統資源并避免資源浪費。
- 及時處理異常:在任務執行過程中,如果發生異常,需要及時處理和記錄異常信息,以保證程序的穩定性和可靠性。
- 使用Future對象進行任務取消和超時控制:通過使用Future對象的cancel方法,可以取消正在執行的任務。同時,可以通過調整 poll 方法的參數來設置超時時間,避免長時間等待任務結果而導致阻塞。
總結
ExecutorCompletionService是一個強大且靈活的工具類,能夠簡化異步任務的處理和結果獲取過程。通過使用ExecutorCompletionService,我們可以更加高效地處理一組異步任務,并按照完成的順序獲取它們的結果。
本文介紹了ExecutorCompletionService的基本使用方法,并對其源碼進行了解析。希望通過這篇博客能夠幫助讀者更好地理解和應用ExecutorCompletionService。