【技術(shù)革命】JDK21虛擬線程來襲,讓系統(tǒng)的吞吐量翻倍!
1. 虛擬線程簡介
虛擬線程是一種輕量級線程,可大大減少編寫、維護(hù)和觀察高吞吐量并發(fā)應(yīng)用程序的工作量。從JDK19開始發(fā)布了虛擬線程的預(yù)覽功能,直到JDK21最終確定虛擬線程。
虛擬線程既廉價(相比平臺線程)又可以創(chuàng)建非常的多,因此絕不應(yīng)池化:每個應(yīng)用任務(wù)都應(yīng)創(chuàng)建一個新的虛擬線程。因此,大多數(shù)虛擬線程的壽命都很短,調(diào)用堆棧也很淺,只需執(zhí)行一次 HTTP 客戶端調(diào)用或一次 JDBC 查詢。相比之下,平臺線程重量級、成本高,因此通常必須池化。這些線程的壽命往往較長,具有較深的調(diào)用堆棧,可在多個任務(wù)之間共享。
總之,虛擬線程保留了可靠的每請求線程風(fēng)格,這種風(fēng)格與 Java 平臺的設(shè)計相協(xié)調(diào),同時還能優(yōu)化利用可用硬件。使用虛擬線程不需要學(xué)習(xí)新的概念,但可能需要放棄為應(yīng)對當(dāng)前線程的高成本而養(yǎng)成的習(xí)慣。虛擬線程不僅能幫助應(yīng)用程序開發(fā)人員,還能幫助框架設(shè)計人員提供易于使用的 API,這些 API 與平臺設(shè)計兼容,同時又不影響可擴(kuò)展性。
虛擬線程是 java.lang.Thread 的一個實(shí)例,它在底層操作系統(tǒng)線程上運(yùn)行 Java 代碼,但在代碼的整個生命周期中不會捕獲操作系統(tǒng)線程。這意味著許多虛擬線程可以在同一個操作系統(tǒng)線程上運(yùn)行 Java 代碼,從而有效地共享操作系統(tǒng)線程。平臺線程會壟斷寶貴的操作系統(tǒng)線程,而虛擬線程不會。虛擬線程的數(shù)量可能遠(yuǎn)遠(yuǎn)大于操作系統(tǒng)線程的數(shù)量。
虛擬線程是線程的一種輕量級實(shí)現(xiàn),由 JDK 而不是操作系統(tǒng)提供。它們是用戶模式線程的一種形式,在其他多線程語言(如 Go 中的 goroutines(協(xié)程(輕量級線程)) 和 Erlang 中的進(jìn)程)中取得了成功。用戶模式線程在 Java 早期版本中甚至被稱為 "綠色線程",當(dāng)時操作系統(tǒng)線程尚未成熟和普及。然而,Java 的綠色線程都共享一個操作系統(tǒng)線程(M:1 調(diào)度),最終被作為操作系統(tǒng)線程包裝器(1:1 調(diào)度)實(shí)現(xiàn)的平臺線程所超越。虛擬線程采用 M:N 調(diào)度,即大量(M)虛擬線程被安排運(yùn)行在較少數(shù)量(N)的操作系統(tǒng)線程上。
虛擬線程是 java.lang.Thread 的一個實(shí)例,與特定操作系統(tǒng)線程無關(guān)。相比之下,平臺線程是以傳統(tǒng)方式實(shí)現(xiàn)的 java.lang.Thread 實(shí)例,是操作系統(tǒng)線程的薄包裝。
2. 傳統(tǒng)請求線程模型
通常服務(wù)器應(yīng)用程序處理相互獨(dú)立的并發(fā)請求時,在請求的整個持續(xù)聲明周期內(nèi)為該請求指定一個線程來處理該請求。這種按請求線程的風(fēng)格易于理解、易于編程、易于調(diào)試和配置。
對于一個請求處理的處理時間,應(yīng)用程序同時處理的請求數(shù)(即并發(fā)數(shù))必須與吞吐量成比例增長。例如,假設(shè)一個平均延遲為 50 毫秒的請求并發(fā)處理 10 個請求,實(shí)現(xiàn)了每秒 200 個請求的吞吐量。若要將該應(yīng)用的吞吐量提高到到每秒 2000 個請求,則需要并發(fā)處理 100 個請求。如果每個請求在請求持續(xù)時間內(nèi)都由一個線程處理,那么要使應(yīng)用程序跟上進(jìn)度,線程數(shù)必須隨著吞吐量的增加而增加。
由于 JDK 將線程作為操作系統(tǒng)(OS)線程的包裝器來實(shí)現(xiàn)。操作系統(tǒng)線程的成本很高,所以我們不能擁有太多的線程,這就使得線程的實(shí)現(xiàn)不適合按請求執(zhí)行的方式。如果每個請求在其生命周期內(nèi)都要使用一個線程,也就是一個操作系統(tǒng)線程,那么在 CPU 或網(wǎng)絡(luò)連接等其他資源耗盡之前,線程數(shù)量往往就已經(jīng)成為限制因素了。JDK 當(dāng)前的線程實(shí)現(xiàn)將應(yīng)用程序的吞吐量限制在遠(yuǎn)低于硬件支持的水平。即使對線程進(jìn)行了池化,也會出現(xiàn)這種情況,因?yàn)槌鼗兄诒苊鈫有戮€程的高昂成本,但不會增加線程總數(shù)。
3. 虛擬線程使用
使用方式1:
// 創(chuàng)建一個執(zhí)行器,為每個任務(wù)啟動一個新的虛擬線程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 10_000).forEach(i -> {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return i;
});
});
}
本例中的任務(wù)是簡單的代碼--休眠1秒--現(xiàn)代硬件可以輕松支持 10,000 個虛擬線程同時運(yùn)行此類代碼。而實(shí)際上,JDK 只在少量操作系統(tǒng)線程(可能只有一個)上運(yùn)行此代碼代碼。
如果該程序使用 ExecutorService(例如 Executors.newCachedThreadPool())為每個任務(wù)創(chuàng)建一個新的平臺線程,情況就會截然不同。ExecutorService 會嘗試創(chuàng)建 10,000 個平臺線程,從而創(chuàng)建 10,000 個操作系統(tǒng)線程,根據(jù)機(jī)器和操作系統(tǒng)的不同,程序可能會崩潰。
即便使用Executors.newFixedThreadPool(200)創(chuàng)建固定數(shù)量的線程,情況也不會好到哪里去。ExecutorService 將創(chuàng)建 200 個平臺線程,供所有 10,000 個任務(wù)共享,因此許多任務(wù)將順序運(yùn)行而非并發(fā)運(yùn)行,程序?qū)⑿枰荛L時間才能完成。對于該程序而言,擁有 200 個平臺線程的池每秒只能完成 200 個任務(wù),而虛擬線程每秒可完成約 10,000 個任務(wù)(經(jīng)過充分預(yù)熱后)。此外,如果將示例程序中的 10_000 改為 1_000_000,那么程序?qū)⑻峤?1,000,000 個任務(wù),創(chuàng)建 1,000,000 個虛擬線程并發(fā)運(yùn)行,(充分預(yù)熱后)吞吐量將達(dá)到每秒約 1,000,000 個任務(wù)。
注意:如果程序中的任務(wù)在一秒鐘內(nèi)執(zhí)行計算(例如對一個巨大的數(shù)組進(jìn)行排序),而不僅僅是休眠,那么增加線程數(shù)超過處理器內(nèi)核數(shù)將無濟(jì)于事,無論它們是虛擬線程還是平臺線程。虛擬線程不是更快的線程--它們運(yùn)行代碼的速度并不比平臺線程快。它們的存在是為了提供規(guī)模(更高的吞吐量),而不是速度(更低的延遲)。虛擬線程的數(shù)量可能比平臺線程多得多,因此根據(jù)利特爾定律,虛擬線程可以提供更高吞吐量所需的更高并發(fā)性。
使用方式2:
手動創(chuàng)建虛擬線程
// 創(chuàng)建虛擬線程
OfVirtual virtual = Thread.ofVirtual().name("pack") ;
virtual.start(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 創(chuàng)建不自動啟動的線程
Thread thread = virtual.unstarted(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 手動啟動虛擬線程
thread.start() ;
// 打印線程對象:VirtualThread[#21,pack]/runnable
System.out.println(thread) ;
// 創(chuàng)建普通線程
OfPlatform platform = Thread.ofPlatform().name("pack") ;
Thread thread = platform.start(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 這里輸出:Thread[#21,pack,5,main]
System.out.println(thread) ;
在上面的代碼中,打印thread輸出的不是對應(yīng)的平臺線程,而是虛擬線程
VirtualThread[#21,pack]/runnable
在執(zhí)行的任務(wù)中通過Thread.currentThread().getName()方法是沒有任何信息,我們可以通過上面的name()方法來設(shè)置線程的名稱及相關(guān)的前綴。如下:
Thread.ofPlatform().name("pack") ;
Thread.ofVirtual().name("pack", 0) ;
使用方式3:
通過ThreadFactory工廠創(chuàng)建
ThreadFactory threadFactory = Thread.ofVirtual().factory() ;
threadFactory.newThread(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}).start() ;
使用方式4:
直接通過Thread靜態(tài)方法
Thread.startVirtualThread(() -> {
System.out.printf("%s - 任務(wù)執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
4. 虛擬線程與傳統(tǒng)線程池對比
使用虛擬線程
public class Demo06 {
static class Task implements Runnable {
@Override
public void run() {
System.err.printf("start - %d%n", System.currentTimeMillis()) ;
try {
Thread.sleep(Duration.ofSeconds(1));
} catch (InterruptedException e) {}
System.err.printf(" end - %d%n", System.currentTimeMillis()) ;
}
}
public static void main(String[] args) throws Exception {
ExecutorService es= Executors.newVirtualThreadPerTaskExecutor() ;
es.submit(new Task()) ;
es.submit(new Task()) ;
es.submit(new Task()) ;
System.in.read() ;
}
}
輸出結(jié)果:
start - 1698827467289
start - 1698827467289
start - 1698827467291
end - 1698827468317
end - 1698827468317
end - 1698827468317
從結(jié)果看出,基本是同時開始,結(jié)束也是基本一起結(jié)束,總耗時1s。
使用傳統(tǒng)線程
任務(wù)都一樣,只是創(chuàng)建線程池的類型修改
public static void main(String[] args) throws Exception {
ExecutorService es= Executors.newFixedThreadPool(1) ;
es.submit(new Task()) ;
es.submit(new Task()) ;
es.submit(new Task()) ;
}
輸出結(jié)果:
start - 1698827686133
end - 1698827687165
start - 1698827687165
end - 1698827688177
start - 1698827688177
end - 1698827689178
從結(jié)果知道這里是一個任務(wù)一個任務(wù)的執(zhí)行串行化,但是你注意觀察,其實(shí)每個任務(wù)的的開始start 的輸出都是要等前一個線程執(zhí)行完了后才能執(zhí)行。結(jié)合上面的虛擬線程對比,start是同時輸出的,這也是虛擬線程的有點(diǎn)了。
5. 使用案例
這是一個遠(yuǎn)程接口調(diào)用的示例:
遠(yuǎn)程3個接口,如下:
@GetMapping("/userinfo")
public Object queryUserInfo() {
try {
TimeUnit.SECONDS.sleep(2) ;
} catch (InterruptedException e) {e.printStackTrace();}
return "查詢用戶信息" ;
}
@GetMapping("/stock")
public Object queryStock() {
try {
TimeUnit.SECONDS.sleep(3) ;
} catch (InterruptedException e) {e.printStackTrace();}
return "查詢庫存信息" ;
}
@GetMapping("/order")
public Object queryOrder() {
try {
TimeUnit.SECONDS.sleep(4) ;
} catch (InterruptedException e) {e.printStackTrace();}
return "查詢訂單信息" ;
}
接口調(diào)用服務(wù),如下:
@Resource
private RestTemplate restTemplate ;
public Map<String, Object> rpc() {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var start = System.currentTimeMillis() ;
// 1.查詢用戶信息
var userinfo = executor.submit(() -> query("http://localhost:8080/demos/userinfo"));
// 2.查詢庫存信息
var stock = executor.submit(() -> query("http://localhost:8080/demos/stock"));
// 3.查詢訂單信息
var order = executor.submit(() -> query("http://localhost:8080/demos/order"));
Map<String, Object> res = Map.of("userinfo", userinfo.get(), "stock", stock.get(), "order", order.get()) ;
System.out.printf("總計耗時:%d毫秒%n", (System.currentTimeMillis() - start)) ;
return res ;
} catch (Exception e) {
return Map.of() ;
}
}
private Object query(String url) {
return this.restTemplate.getForObject(url, String.class) ;
}
在這個案例中,如果使用傳統(tǒng)的線程池,如果并發(fā)量大,那么很可能很多的任務(wù)都要排隊(duì)等待,或者你需要創(chuàng)建更多的平臺線程來滿足吞吐量問題。但是現(xiàn)在有了虛擬線程你可以不用再考慮線程不夠用的情況了,每個任務(wù)的執(zhí)行都會被一個虛擬的線程執(zhí)行(不是平臺線程,可能這些虛擬線程只會對應(yīng)到一個平臺線程)。
虛擬線程可在以下情況顯著提高應(yīng)用吞吐量:
- 并發(fā)任務(wù)的數(shù)量很高(超過幾千)
- 工作負(fù)載不受cpu限制,因?yàn)樵谶@種情況下,線程比處理器內(nèi)核多并不能提高吞吐量
6. 結(jié)構(gòu)化并發(fā)(預(yù)覽功能)
結(jié)構(gòu)化并發(fā)目前還是預(yù)覽功能,并沒有在JDK21中正式發(fā)布,不過我們可以先來看看什么是結(jié)構(gòu)化并發(fā)。
結(jié)構(gòu)化并發(fā) API 是來簡化并發(fā)編程。結(jié)構(gòu)化并發(fā)將在不同線程中運(yùn)行的一組相關(guān)任務(wù)視為一個工作單元,從而簡化了錯誤處理和取消,提高了可靠性,并增強(qiáng)了可觀察性。
結(jié)構(gòu)化并發(fā)的目標(biāo)是:
- 推廣一種并發(fā)編程風(fēng)格,消除因取消和關(guān)閉而產(chǎn)生的常見風(fēng)險,如線程泄漏和取消延遲。
- 提高并發(fā)代碼的可觀察性。
通過示例來理解結(jié)構(gòu)化并發(fā)。
如下示例是通過傳統(tǒng)線程池的方式并發(fā)的從遠(yuǎn)程獲取信息,代碼如下:
static RestTemplate restTemplate = new RestTemplate() ;
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(2) ;
Future<Object> userInfo = es.submit(UnstructuredConcurrentDemo::queryUserInfo) ;
Future<Object> stock = es.submit(UnstructuredConcurrentDemo::queryStock) ;
Object userInfoRet = userInfo.get() ;
System.out.printf("執(zhí)行結(jié)果:用戶信息:%s%n", userInfoRet.toString()) ;
Object stockRet = stock.get() ;
System.out.printf("執(zhí)行結(jié)果:庫存信息:%s%n", stockRet.toString()) ;
}
public static Object queryUserInfo() {
return restTemplate.getForObject("http://localhost:8080/demos/userinfo", String.class) ;
}
public static Object queryStock() {
return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;
}
上面的代碼中沒有什么問題,程序都能夠運(yùn)行的正常,結(jié)果如下:
08:49:53.502 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
08:49:53.504 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結(jié)果:用戶信息:查詢用戶信息
08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結(jié)果:庫存信息:查詢庫存信息
但是如果其中一個任務(wù)執(zhí)行失敗了后會如何呢?將其中一個任務(wù)拋出異常,如下代碼:
public static Object queryStock() {
System.out.println(1 / 0) ;
return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;
}
再次執(zhí)行代碼,結(jié)果如下:
發(fā)生異常:java.lang.ArithmeticException: / by zero
09:06:05.938 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock
09:06:05.948 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]
09:06:08.972 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
09:06:08.974 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結(jié)果:庫存信息:查詢庫存信息
從結(jié)果看出,獲取用戶信息子任務(wù)發(fā)生異常后,并不會影響到獲取庫存子任務(wù)的執(zhí)行。
通過結(jié)構(gòu)化并發(fā)方式
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<Object> userInfo = scope.fork(UnstructuredConcurrentDemo::queryUserInfo) ;
Supplier<Object> stock = scope.fork(UnstructuredConcurrentDemo::queryStock) ;
// 等待在此任務(wù)范圍內(nèi)啟動的所有子任務(wù)完成或某個子任務(wù)失敗。
scope.join() ;
Object userInfoRet = userInfo.get() ;
System.out.printf("執(zhí)行結(jié)果:用戶信息:%s%n", userInfoRet.toString()) ;
Object stockRet = stock.get() ;
System.out.printf("執(zhí)行結(jié)果:庫存信息:%s%n", stockRet.toString()) ;
}
當(dāng)一個子任務(wù)發(fā)生錯誤時,其它的子任務(wù)會在未完成的情況下取消,執(zhí)行結(jié)果如下:
08:59:51.951 [] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock
08:59:51.961 [] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]
Exception in thread "main" java.lang.IllegalStateException: Subtask not completed or did not complete successfully
at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.get(StructuredTaskScope.java:936)
at com.pack.rpc.UnstructuredConcurrentDemo.structured(UnstructuredConcurrentDemo.java:26)
at com.pack.rpc.UnstructuredConcurrentDemo.main(UnstructuredConcurrentDemo.java:17)
從控制臺的輸出看出,獲取庫存的調(diào)用被取消了。
完畢!!!