CompletableFuture真香,可以替代CountDownLatch!
在對(duì)類的命名這篇長文中,我們提到了Future和Promise。
Future相當(dāng)于一個(gè)占位符,代表一個(gè)操作將來的結(jié)果。一般通過get可以直接阻塞得到結(jié)果,或者讓它異步執(zhí)行然后通過callback回調(diào)結(jié)果。
但如果回調(diào)中嵌入了回調(diào)呢?如果層次很深,就是回調(diào)地獄。Java中的CompletableFuture其實(shí)就是Promise,用來解決回調(diào)地獄問題。Promise是為了讓代碼變得優(yōu)美而存在的。
有多優(yōu)美?這么說吧,一旦你使用了CompletableFuture,就會(huì)愛不釋手,就像初戀女友一樣,天天想著她。
一系列靜態(tài)方法
從它的源代碼中,我們可以看到,CompletableFuture直接提供了幾個(gè)便捷的靜態(tài)方法入口。其中有run和supply兩組。
run的參數(shù)是Runnable,而supply的參數(shù)是Supplier。前者沒有返回值,而后者有,否則沒有什么兩樣。
這兩組靜態(tài)函數(shù),都提供了傳入自定義線程池的功能。如果你用的不是外置的線程池,那么它就會(huì)使用默認(rèn)的ForkJoin線程池。默認(rèn)的線程池,大小和用途你是控制不了的,所以還是建議自己傳遞一個(gè)。
典型的代碼,寫起來是這個(gè)樣子。
- CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
- return "test";
- });
- String result = future.join();
拿到CompletableFuture后,你就可以做更多的花樣。
這些花樣有很多
我們說面說了,CompletableFuture的主要作用,就是讓代碼寫起來好看。配合Java8之后的stream流,可以把整個(gè)計(jì)算過程抽象成一個(gè)流。前面任務(wù)的計(jì)算結(jié)果,可以直接作為后面任務(wù)的輸入,就像是管道一樣。
- thenApply
- thenApplyAsync
- thenAccept
- thenAcceptAsync
- thenRun
- thenRunAsync
- thenCombine
- thenCombineAsync
- thenCompose
- thenComposeAsync
比如,下面代碼的執(zhí)行結(jié)果是99,并不因?yàn)槭钱惒骄痛騺y代碼執(zhí)行的順序了。
- CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
- .thenApplyAsync((e) -> {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- return e * 10;
- }).thenApplyAsync(e -> e - 1);
- cf.join();
- System.out.println(cf.get());
同樣的,函數(shù)的作用還要看then后面的動(dòng)詞。
- apply 有入?yún)⒑头祷刂担雲(yún)榍爸萌蝿?wù)的輸出
- accept 有入?yún)o返回值,會(huì)返回CompletableFuture
- run 沒有入?yún)⒁矝]有返回值,同樣會(huì)返回CompletableFuture
- combine 形成一個(gè)復(fù)合的結(jié)構(gòu),連接兩個(gè)CompletableFuture,并將它們的2個(gè)輸出結(jié)果,作為combine的輸入
- compose 將嵌套的CompletableFuture平鋪開,用來串聯(lián)兩個(gè)CompletableFuture
when和handle
上面的函數(shù)列表,其實(shí)還有很多。比如:
- whenComplete
when的意思,就是任務(wù)完成時(shí)候的回調(diào)。比如我們上面的例子,打算在完成任務(wù)后,輸出一個(gè)done。它也是屬于只有入?yún)]有出參的范疇,適合放在最后一步進(jìn)行觀測(cè)。
- CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
- .thenApplyAsync((e) -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- return e * 10;
- }).thenApplyAsync(e -> e - 1)
- .whenComplete((r, e)->{
- System.out.println("done");
- })
- ;
- cf.join();
- System.out.println(cf.get());
handle和exceptionally的作用,和whenComplete是非常像的。
- public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
- public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture的任務(wù)是串聯(lián)的,如果它的其中某一步驟發(fā)生了異常,會(huì)影響后續(xù)代碼的運(yùn)行的。
exceptionally從名字就可以看出,是專門處理這種情況的。比如,我們強(qiáng)制某個(gè)步驟除以0,發(fā)生異常,捕獲后返回-1,它將能夠繼續(xù)運(yùn)行。
- CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
- .thenApplyAsync(e->e/0)
- .thenApplyAsync(e -> e - 1)
- .exceptionally(ex->{
- System.out.println(ex);
- return -1;
- });
- cf.join();
- System.out.println(cf.get());
handle更加高級(jí)一些,因?yàn)樗艘粋€(gè)異常參數(shù),還有一個(gè)正常的入?yún)?。處理方法也都類似,不再贅述?/p>
當(dāng)然,CompletableFuture的函數(shù)不僅僅這些,還有更多,根據(jù)函數(shù)名稱很容易能夠了解到它的作用。它還可以替換復(fù)雜的CountDownLatch,這要涉及到幾個(gè)比較難搞的函數(shù)。
替代CountDownLatch
考慮下面一個(gè)場(chǎng)景。某一個(gè)業(yè)務(wù)接口,需要處理幾百個(gè)請(qǐng)求,請(qǐng)求之后再把這些結(jié)果給匯總起來。
如果順序執(zhí)行的話,假設(shè)每個(gè)接口耗時(shí)100ms,那么100個(gè)接口,耗時(shí)就需要10秒。假如我們并行去獲取的話,那么效率就會(huì)提高。
使用CountDownLatch可以解決。
- ExecutorService executor = Executors.newFixedThreadPool(5);
- CountDownLatch countDown = new CountDownLatch(requests.size());
- for(Request request:requests){
- executor.execute(()->{
- try{
- //some opts
- }finally{
- countDown.countDown();
- }
- });
- }
- countDown.await(200,TimeUnit.MILLISECONDS);
我們使用CompletableFuture來替換它。
- ExecutorService executor = Executors.newFixedThreadPool(5);
- List<CompletableFuture<Result>> futureList = requests
- .stream()
- .map(request->
- CompletableFuture.supplyAsync(e->{
- //some opts
- },executor))
- .collect(Collectors.toList());
- CompletableFuture<Void> allCF = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
- allCF.join();
我們這里用到了一個(gè)主要的函數(shù),那就是allOf,用來把所有的CompletableFuture組合在一起;類似的還有anyOf,表示只運(yùn)行其中一個(gè)。常用的,還有三個(gè)函數(shù):
- thenAcceptBoth 處理兩個(gè)任務(wù)的情況,有兩個(gè)任務(wù)結(jié)果入?yún)?,無返回值
- thenCombine 處理兩個(gè)任務(wù)的情況,有入?yún)⒂蟹祷刂?,最喜歡
- runAfterBoth 處理兩個(gè)任務(wù)的情況,無入?yún)?,無返回值
End
自從認(rèn)識(shí)了CompletableFuture,我已經(jīng)很少硬編碼Future了。相對(duì)于各種回調(diào)的嵌套,CompletableFuture為我們提供了更直觀、更優(yōu)美的API。在“多個(gè)任務(wù)等待完成狀態(tài)”這個(gè)應(yīng)用場(chǎng)景,CompletableFuture已經(jīng)成了我的首選。
唯一的問題是,它的函數(shù)有點(diǎn)多,你需要熟悉一小段時(shí)間。另外,有一個(gè)小小的問題,個(gè)人覺得,這個(gè)類如果叫做Promise的話,就能夠和JS的統(tǒng)一起來,算是錦上添花吧。
作者簡(jiǎn)介:小姐姐味道 (xjjdog),一個(gè)不允許程序員走彎路的公眾號(hào)。聚焦基礎(chǔ)架構(gòu)和Linux。十年架構(gòu),日百億流量,與你探討高并發(fā)世界,給你不一樣的味道。