深入淺出協(xié)程、線程和并發(fā)問(wèn)題
"協(xié)程是輕量級(jí)的線程",相信大家不止一次聽(tīng)到這種說(shuō)法。但是您真的理解其中的含義嗎?恐怕答案是否定的。接下來(lái)的內(nèi)容會(huì)告訴大家協(xié)程是如何在 Android 運(yùn)行時(shí)中被運(yùn)行的,它們和線程之間的關(guān)系是什么,以及在使用 Java 編程語(yǔ)言線程模型時(shí)所遇到的并發(fā)問(wèn)題。
協(xié)程和線程
協(xié)程旨在簡(jiǎn)化異步執(zhí)行的代碼。對(duì)于 Android 運(yùn)行時(shí)的協(xié)程,lambda 表達(dá)式的代碼塊會(huì)在專(zhuān)門(mén)的線程中執(zhí)行。例如,示例中的 斐波那契 運(yùn)算:
- // 在后臺(tái)線程中運(yùn)算第十級(jí)斐波那契數(shù)
- someScope.launch(Dispatchers.Default) {
- val fibonacci10 = synchronousFibonacci(10)
- saveFibonacciInMemory(10, fibonacci10)
- }
- private fun synchronousFibonacci(n: Long): Long { /* ... */ }
上面 async 協(xié)程的代碼塊,會(huì)被分發(fā)到由協(xié)程庫(kù)所管理的線程池中執(zhí)行,實(shí)現(xiàn)了同步且阻塞的斐波那契數(shù)值運(yùn)算,并且將結(jié)果存入內(nèi)存,上例中的線程池屬于 Dispatchers.Default。該代碼塊會(huì)在未來(lái)某些時(shí)間在線程池中的某一線程中執(zhí)行,具體執(zhí)行時(shí)間取決于線程池的策略。
請(qǐng)注意由于上述代碼中未包含掛起操作,因此它會(huì)在同一個(gè)線程中執(zhí)行。而協(xié)程是有可能在不同的線程中執(zhí)行的,比如將執(zhí)行部分移動(dòng)到不同的分發(fā)器,或者在使用線程池的分發(fā)器中包含帶有掛起操作的代碼。
如果不使用協(xié)程的話,您還可以使用線程自行實(shí)現(xiàn)類(lèi)似的邏輯,代碼如下:
- // 創(chuàng)建包含 4 個(gè)線程的線程池
- val executorService = Executors.newFixedThreadPool(4)
- // 在其中的一個(gè)線程中安排并執(zhí)行代碼
- executorService.execute {
- val fibonacci10 = synchronousFibonacci(10)
- saveFibonacciInMemory(10, fibonacci10)
- }
雖然您可以自行實(shí)現(xiàn)線程池的管理,但是我們?nèi)匀煌扑]使用協(xié)程作為 Android 開(kāi)發(fā)中首選的異步實(shí)現(xiàn)方案,它具備內(nèi)置的取消機(jī)制,可以提供更便捷的異常捕捉和結(jié)構(gòu)式并發(fā),后者可以減少類(lèi)似內(nèi)存泄漏問(wèn)題的發(fā)生幾率,并且與 Jetpack 庫(kù)集成度更高。
工作原理
從您創(chuàng)建協(xié)程到代碼被線程執(zhí)行這期間發(fā)生了什么呢?當(dāng)您使用標(biāo)準(zhǔn)的協(xié)程 builder 創(chuàng)建協(xié)程時(shí),您可以指定該協(xié)程所運(yùn)行的 CoroutineDispatcher,如果未指定,系統(tǒng)會(huì)默認(rèn)使用 Dispatchers.Default。
CoroutineDispatcher 會(huì)負(fù)責(zé)將協(xié)程的執(zhí)行分配到具體的線程 。在底層,當(dāng) CoroutineDispatcher 被調(diào)用時(shí),它會(huì)調(diào)用封裝了 Continuation (比如這里的協(xié)程) interceptContinuation 方法來(lái)攔截協(xié)程。該流程是以 CoroutineDispatcher 實(shí)現(xiàn)了 CoroutineInterceptor 接口作為前提。
如果您閱讀了我之前的關(guān)于 協(xié)程在底層是如何實(shí)現(xiàn) 的文章,您應(yīng)該已經(jīng)知道了編譯器會(huì)創(chuàng)建狀態(tài)機(jī),以及關(guān)于狀態(tài)機(jī)的相關(guān)信息 (比如接下來(lái)要執(zhí)行的操作) 是被存儲(chǔ)在 Continuation 對(duì)象中。
一旦 Continuation 對(duì)象需要在另外的 Dispatcher 中執(zhí)行,DispatchedContinuation 的 resumeWith 方法會(huì)負(fù)責(zé)將協(xié)程分發(fā)到合適的 Dispatcher。
此外,在 Java 編程語(yǔ)言的實(shí)現(xiàn)中,繼承自 DispatchedTask 抽象類(lèi)的 DispatchedContinuation 也屬于 Runnable 接口的一種實(shí)現(xiàn)類(lèi)型。因此,DispatchedContinuation 對(duì)象也可以在線程中執(zhí)行。其中的好處是當(dāng)指定了 CoroutineDispatcher 時(shí),協(xié)程就會(huì)轉(zhuǎn)換為 DispatchedTask,并且作為 Runnable 在線程中執(zhí)行。
那么當(dāng)您創(chuàng)建協(xié)程后,dispatch 方法如何被調(diào)用呢?當(dāng)您使用標(biāo)準(zhǔn)的協(xié)程 builder 創(chuàng)建協(xié)程時(shí),您可以指定啟動(dòng)參數(shù),它的類(lèi)型是 CoroutineStart。例如,您可以設(shè)置協(xié)程在需要的時(shí)候才啟動(dòng),這時(shí)可以將參數(shù)設(shè)置為 CoroutineStart.LAZY。默認(rèn)情況下,系統(tǒng)會(huì)使用 CoroutineStart.DEFAULT 根據(jù) CoroutineDispatcher 來(lái)安排執(zhí)行時(shí)機(jī)。
分發(fā)器和線程池
您可以使用 Executor.asCoroutineDispatcher() 擴(kuò)展函數(shù)將協(xié)程轉(zhuǎn)換為 CoroutineDispatcher 后,即可在應(yīng)用中的任何線程池中執(zhí)行該協(xié)程。此外,您還可以使用協(xié)程庫(kù)默認(rèn)的 Dispatchers。
您可以看到 createDefaultDispatcher 方法中是如何初始化 Dispatchers.Default 的。默認(rèn)情況下,系統(tǒng)會(huì)使用 DefaultScheduler。如果您看一下 Dispatcher.IO 的實(shí)現(xiàn)代碼,它也使用了 DefaultScheduler,支持按需創(chuàng)建至少 64 個(gè)線程。Dispatchers.Default 和 Dispatchers.IO 是隱式關(guān)聯(lián)的,因?yàn)樗鼈兪褂昧送粋€(gè)線程池,這就引出了我們下一個(gè)話題,使用不同的分發(fā)器調(diào)用 withContext 會(huì)帶來(lái)哪些運(yùn)行時(shí)的開(kāi)銷(xiāo)呢?
線程和 withContext 的性能表現(xiàn)
在 Android 運(yùn)行時(shí)中,如果運(yùn)行的線程比 CPU 的可用內(nèi)核數(shù)多,那么切換線程會(huì)帶來(lái)一定的運(yùn)行時(shí)開(kāi)銷(xiāo)。上下文切換 并不輕松!操作系統(tǒng)需要保存和恢復(fù)執(zhí)行的上下文,而且 CPU 除了執(zhí)行實(shí)際的應(yīng)用功能之外,還需要花時(shí)間規(guī)劃線程。除此之外,當(dāng)線程中所運(yùn)行代碼阻塞的時(shí)候也會(huì)造成上下文切換。如果上述的問(wèn)題是針對(duì)線程的,那么在不同的 Dispatchers 中使用 withContext 會(huì)帶來(lái)哪些性能上的損失呢?
還好線程池會(huì)幫我們解決這些復(fù)雜的操作,它會(huì)嘗試盡量多地執(zhí)行任務(wù) (這也是為什么在線程池中執(zhí)行操作要優(yōu)于手動(dòng)創(chuàng)建線程)。協(xié)程由于被安排在線程池中執(zhí)行,所以也會(huì)從中受益。基于此,協(xié)程不會(huì)阻塞線程,它們反而會(huì)掛起自己的工作,因而更加有效。
Java 編程語(yǔ)言中默認(rèn)使用的線程池是 CoroutineScheduler 。它以最高效的方式將協(xié)程分發(fā)到工作線程。由于 Dispatchers.Default 和 Dispatchers.IO 使用相同的線程池,在它們之間切換會(huì)盡量避免線程切換。協(xié)程庫(kù)會(huì)優(yōu)化這些切換調(diào)用,保持在同一個(gè)分發(fā)器和線程上,并且盡量走捷徑。
由于 Dispatchers.Main 在帶有 UI 的應(yīng)用中通常屬于不同的線程,所以協(xié)程中 Dispatchers.Default和 Dispatchers.Main 之間的切換并不會(huì)帶來(lái)太大的性能損失,因?yàn)閰f(xié)程會(huì)掛起 (比如在某個(gè)線程中停止執(zhí)行),然后會(huì)被安排在另外的線程中繼續(xù)執(zhí)行。
協(xié)程中的并發(fā)問(wèn)題
協(xié)程由于其能夠簡(jiǎn)單地在不同線程上規(guī)劃操作,的確使得異步編程更加輕松。但是另一方面,便捷是一把雙刃劍: 由于協(xié)程是運(yùn)行在 Java 編程語(yǔ)言的線程模型之上,它們難以逃脫線程模型所帶來(lái)的并發(fā)問(wèn)題。因此,您需要注意并且盡量避免該問(wèn)題。
近年來(lái),像不可變性這樣的策略相對(duì)減輕了由線程所引發(fā)的問(wèn)題。然而,有些場(chǎng)景下,不可變性策略也無(wú)法完全避免問(wèn)題的出現(xiàn)。所有并發(fā)問(wèn)題的源頭都是狀態(tài)管理!尤其是在一個(gè)多線程環(huán)境下訪問(wèn)可變的狀態(tài)。
在多線程應(yīng)用中,操作的執(zhí)行順序是不可預(yù)測(cè)的。與編譯器優(yōu)化操作執(zhí)行順序不同,線程無(wú)法保證以特定的順序執(zhí)行,而上下文切換會(huì)隨時(shí)發(fā)生。如果在訪問(wèn)可變狀態(tài)時(shí)沒(méi)有采取必要的防范措施,線程就會(huì)訪問(wèn)到過(guò)時(shí)的數(shù)據(jù),丟失更新,或者遇到 資源競(jìng)爭(zhēng) 問(wèn)題等等。
請(qǐng)注意這里所討論的可變狀態(tài)和訪問(wèn)順序并不僅限于 Java 編程語(yǔ)言。它們?cè)谄渌脚_(tái)上同樣會(huì)影響協(xié)程執(zhí)行。
使用了協(xié)程的應(yīng)用本質(zhì)上就是多線程應(yīng)用。使用了協(xié)程并且涉及可變狀態(tài)的類(lèi)必須采取措施使其可控,比如保證協(xié)程中的代碼所訪問(wèn)的數(shù)據(jù)是最新的。這樣一來(lái),不同的線程之間就不會(huì)互相干擾。并發(fā)問(wèn)題會(huì)引起潛在的 bug,使您很難在應(yīng)用中調(diào)試和定位問(wèn)題,甚至出現(xiàn) 海森堡 bug。
這一類(lèi)型的類(lèi)非常常見(jiàn)。比如該類(lèi)需要將用戶的登錄信息緩存在內(nèi)存中,或者當(dāng)應(yīng)用在活躍狀態(tài)時(shí)緩存一些值。如果您稍有大意,那么并發(fā)問(wèn)題就會(huì)乘虛而入!使用 withContext(defaultDispatcher) 的掛起函數(shù)無(wú)法保證會(huì)在同一個(gè)線程中執(zhí)行。
比如我們有一個(gè)類(lèi)需要緩存用戶所做的交易。如果緩存沒(méi)有被正確訪問(wèn),比如下面代碼所示,就會(huì)出現(xiàn)并發(fā)問(wèn)題:
- class TransactionsRepository(
- private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
- ) {
- private val transactionsCache = mutableMapOf<User, List<Transaction>()
- private suspend fun addTransaction(user: User, transaction: Transaction) =
- // 注意!訪問(wèn)緩存的操作未被保護(hù)!
- // 會(huì)出現(xiàn)并發(fā)問(wèn)題:線程會(huì)訪問(wèn)到過(guò)期數(shù)據(jù)
- // 并且出現(xiàn)資源競(jìng)爭(zhēng)問(wèn)題
- withContext(defaultDispatcher) {
- if (transactionsCache.contains(user)) {
- val oldList = transactionsCache[user]
- val newList = oldList!!.toMutableList()
- newList.add(transaction)
- transactionsCache.put(user, newList)
- } else {
- transactionsCache.put(user, listOf(transaction))
- }
- }
- }
即使我們這里所討論的是 Kotlin,由 Brian Goetz 所編撰的《Java 并發(fā)編程實(shí)踐》對(duì)于了解本文主題和 Java 編程語(yǔ)言系統(tǒng)是非常好的參考材料。此外,Jetbrains 針對(duì) 共享可變的狀態(tài)和并發(fā) 的主題也提供了相關(guān)的文檔。
保護(hù)可變狀態(tài)
對(duì)于如何保護(hù)可變狀態(tài),或者找到合適的 同步 策略,取決于數(shù)據(jù)本身和相關(guān)的操作。本節(jié)內(nèi)容啟發(fā)大家注意可能會(huì)遇到的并發(fā)問(wèn)題,而不是簡(jiǎn)單羅列保護(hù)可變狀態(tài)的方法和 API。總而言之,這里為大家準(zhǔn)備了一些提示和 API 可以幫助大家針對(duì)可變變量實(shí)現(xiàn)線程安全。
封裝
可變狀態(tài)應(yīng)該屬于并被封裝在類(lèi)里。該類(lèi)應(yīng)該將狀態(tài)的訪問(wèn)操作集中起來(lái),根據(jù)應(yīng)用場(chǎng)景使用同步策略保護(hù)變量的訪問(wèn)和修改操作。
線程限制
一種方案是將讀取和寫(xiě)入操作限制在一個(gè)線程里。可以使用隊(duì)列基于生產(chǎn)者-消費(fèi)者模式實(shí)現(xiàn)對(duì)可變狀態(tài)的訪問(wèn)。Jetbrains 對(duì)此提供了很棒的 文檔。
避免重復(fù)工作
在 Android 運(yùn)行時(shí)中,包含線程安全的數(shù)據(jù)結(jié)構(gòu)可供您保護(hù)可變變量。比如,在計(jì)數(shù)器示例中,您可以使用 AtomicInteger。又比如,要保護(hù)上述代碼中的 Map,您可以使用 ConcurrentHashMap。ConcurrentHashMap 是線程安全的,并且優(yōu)化了 map 的讀取和寫(xiě)入操作的吞吐量。
請(qǐng)注意,線程安全的數(shù)據(jù)結(jié)構(gòu)并不能解決調(diào)用順序問(wèn)題,它們只是確保內(nèi)存數(shù)據(jù)的訪問(wèn)是原子操作。當(dāng)邏輯不太復(fù)雜的時(shí)候,它們可以避免使用 lock。比如,它們無(wú)法用在上面的 transactionCache 示例中,因?yàn)樗鼈冎g的操作順序和邏輯需要使用線程并進(jìn)行訪問(wèn)保護(hù)。
而且,當(dāng)已修改的對(duì)象已經(jīng)存儲(chǔ)在這些線程安全的數(shù)據(jù)結(jié)構(gòu)中時(shí),其中的數(shù)據(jù)需要保持不可變或者受保護(hù)狀態(tài)來(lái)避免資源競(jìng)爭(zhēng)問(wèn)題。
自定義方案
如果您有復(fù)合的操作需要被同步,@Volatile 和線程安全的數(shù)據(jù)結(jié)構(gòu)也不會(huì)有效果。有可能內(nèi)置的 @Synchronized 注解的粒度也不足以達(dá)到理想效果。
在這些情況下,您可能需要使用并發(fā)工具創(chuàng)建您自己的同步機(jī)制,比如 latches、semaphores 或者 barriers。其它場(chǎng)景下,您可以使用 lock 和 mutex 無(wú)條件地保護(hù)多線程訪問(wèn)。
Kotlin 中的 Mute 包含掛起函數(shù) lock 和 unlock,可以手動(dòng)控制保護(hù)協(xié)程的代碼。而擴(kuò)展函數(shù) Mutex.withLock 使其更加易用:
- class TransactionsRepository(
- private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
- ) {
- // Mutex 保護(hù)可變狀態(tài)的緩存
- private val cacheMutex = Mutex()
- private val transactionsCache = mutableMapOf<User, List<Transaction>()
- private suspend fun addTransaction(user: User, transaction: Transaction) =
- withContext(defaultDispatcher) {
- // Mutex 保障了讀寫(xiě)緩存的線程安全
- cacheMutex.withLock {
- if (transactionsCache.contains(user)) {
- val oldList = transactionsCache[user]
- val newList = oldList!!.toMutableList()
- newList.add(transaction)
- transactionsCache.put(user, newList)
- } else {
- transactionsCache.put(user, listOf(transaction))
- }
- }
- }
- }
由于使用 Mutex 的協(xié)程在可以繼續(xù)執(zhí)行之前會(huì)掛起操作,因此要比 Java 編程語(yǔ)言中的 lock 高效很多,因?yàn)楹笳邥?huì)阻塞整個(gè)線程。在協(xié)程中請(qǐng)謹(jǐn)慎使用 Java 語(yǔ)言中的同步類(lèi),因?yàn)樗鼈儠?huì)阻塞整個(gè)協(xié)程所處的線程,并且引發(fā) 活躍度 問(wèn)題。
傳入?yún)f(xié)程中的代碼最終會(huì)在一個(gè)或者多個(gè)線程中執(zhí)行。同樣的,協(xié)程在 Android 運(yùn)行時(shí)的線程模型下依然需要遵循約束條件。所以,使用協(xié)程也同樣會(huì)出現(xiàn)存在隱患的多線程代碼。所以,在代碼中請(qǐng)謹(jǐn)慎訪問(wèn)共享的可變狀態(tài)。