在SpringBoot項目中使用CompletableFuture優化并發REST調用的正確姿勢
環境:SpringBoot2.7.18
1. 簡介
在項目開發時,經常會遇到從不同的接口服務拉取數據并將其匯總到響應中。在微服務中,這些數據源通常是外部 REST API。在本篇文章中,我們將使用 Java 的 CompletableFuture 高效地并行請求多個外部 REST API 中的數據。同時,會對整個請求過程中的異常處理、請求超時進行詳細的介紹。
2. 為什么要并行調用?
假設我們需要更新對象中的多個字段,每個字段的值都來自外部 REST 調用。一種方法也是最簡單的方式是依次調用每個 API 來更新每個字段。
但是,等待一個 REST 調用完成后再啟動另一個會增加服務的整體響應時間。例如,如果我們調用兩個應用程序接口,每個需要 5 秒鐘,那么總時間至少要 10 秒鐘,因為第二個調用需要等待第一個調用完成。
相反,我們可以并行調用所有 API,這樣總時間就是最慢(耗時最長)的 REST 調用時間。例如,一個調用需要 7 秒,另一個需要 5 秒。在這種情況下,我們將等待 7 秒,因為我們已經并行處理了所有內容,必須等待所有結果完成。
因此,并行化是減少服務響應時間、提高服務可擴展性和改善用戶體驗的絕佳選擇。
3. 實戰案例
3.1 定義用于更新的目標 POJO
public class Purchase {
private String orderDescription ;
private String paymentDescription ;
private String buyerName ;
private String orderId ;
private String paymentId ;
private String userId ;
// getters and setters
}
該采購類有三個需要更新的字段,每個字段都需要通過 ID 進行不同的 REST 調用來查詢。
接下來,先創建一個類,定義一個 RestTemplate Bean 和一個用于 REST 調用的域 URL:
@Component
public class PurchaseRestCallsAsyncExecutor {
private static final String BASE_URL = "http://www.pack.com" ;
private final RestTemplate restTemplate ;
public PurchaseRestCallsAsyncExecutor(RestTemplate restTemplate) {
this.restTemplate = restTemplate ;
}
}
接下來,分別編寫3個REST接口調用的方法
現在,讓我們來定義 /orders API 調用:
public String getOrderDescription(String orderId) {
ResponseEntity<String> result = restTemplate.getForEntity(
String.format("%s/orders/%s", BASE_URL, orderId),
String.class) ;
return result.getBody() ;
}
然后,讓我們定義 /payments API 調用:
public String getPaymentDescription(String paymentId) {
ResponseEntity<String> result = restTemplate.getForEntity(
String.format("%s/payments/%s", BASE_URL, paymentId),
String.class) ;
return result.getBody() ;
}
最后,我們定義了 /users API 調用:
public String getUserName(String userId) {
ResponseEntity<String> result = restTemplate.getForEntity(
String.format("%s/users/%s", BASE_URL, userId),
String.class) ;
return result.getBody() ;
}
這三個接口方法都使用 getForEntity() 方法進行 REST 調用,并將結果封裝在一個 ResponseEntity 對象中。
3.2 使用 CompletableFuture 進行多次 REST 調用
現在,我們就可以創建一個方法,用于構建和運行一組三個 CompletableFutures:
public void updatePurchase(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription),
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription),
CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
.thenAccept(purchase::setBuyerName)
).join() ;
}
我們使用allOf()方法來構建CompletableFuture的步驟。每個參數都是一個并行任務,這些任務以另一個通過REST調用及其結果構建的CompletableFuture的形式存在。
我們首先使用supplyAsync()方法提供了一個Supplier,從這個Supplier中我們將檢索數據。然后,我們使用thenAccept()來消費supplyAsync()的結果,并將其設置到Purchase類中相應的字段上。
在allOf()方法結束時,我們只是構建了這些任務,但尚未執行任何操作。
最后,我們在所有任務構建完畢后調用join()方法來并行運行所有任務并收集它們的結果。由于join()是一個線程阻塞操作,我們只在最后調用它,而不是在每個任務步驟之后調用,這是為了通過減少線程阻塞來優化應用程序性能。
由于我們沒有為supplyAsync()方法提供一個自定義的ExecutorService,因此所有任務都在同一個Executor中運行。默認情況下,Java使用ForkJoinPool.commonPool()。
建議為supplyAsync()方法指定一個自定義的ExecutorService是一個好習慣,這樣我們可以對線程池參數有更多的控制。
3.3 錯誤處理
在分布式系統中,服務不可用或網絡故障是很常見的。這些故障可能發生在外部 REST API 中,而我們作為該 API 的客戶端卻并不知情。例如,如果應用程序宕機,這就導致發送的請求將永遠無法完成。
因此,我們可以使用 handle() 方法單獨處理每個 REST 調用異常:
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) ;
該方法的參數是一個 BiFunction,其中包含作為參數的上一個任務的結果和異常。 接下來我們將 handle() 步驟添加到 CompletableFuture 的一個步驟中
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription)
.handle((result, exception) -> {
if (exception != null) {
// 異常處理
return null ;
}
return result ;
})
).join() ;
}
在示例中,handle() 從 thenAccept() 調用的 setPaymentDescription() 中獲取一個 Void 類型,然后將 thenAccept() 動作中拋出的任何錯誤存儲到異常中。最后,如果沒有異常拋出,則 handle() 返回作為參數傳遞的值。否則,返回空值。
3.4 處理 REST 調用超時
當我們使用 CompletableFuture 時,我們可以指定一個任務超時,類似于我們在 REST 調用中定義的超時。因此,如果任務沒有在指定時間內完成,Java 會以超時異常(TimeoutException)結束任務執行,修改代碼如下:
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription)
// 設置超時時間5s
.orTimeout(5, TimeUnit.SECONDS)
.handle((result, exception) -> {
if (exception instanceof TimeoutException) {
// 異常處理
return null ;
}
return result ;
})
).join() ;
}
我們在 CompletableFuture 中通過 orTimeout() 方法設置超時時間,如果在 5 秒內未完成任務時停止任務執行。同時還在 handle() 方法中添加了 if 語句,以便單獨處理 TimeoutException。在 CompletableFuture 中添加超時可確保任務始終完成。這對于避免線程無限期地等待可能永遠不會完成的操作結果非常重要。因此,它減少了處于長時間運行狀態的線程數量,提高了應用程序的健康度。