一文說通異步 LINQ
LINQ 這個(gè)東西,出來很早了,寫過幾年代碼的兄弟們,或多或少都用過一些。
早期的 LINQ,主要是同步的,直到 C# 8.0 加入 IAsyncEnumerable,LINQ 才真正轉(zhuǎn)向異步。這本來是個(gè)非常好的改變,配合 System.Linq.Async 庫提供的擴(kuò)展,可以在諸如 Where、Select、GroupBy 等各種地方用到異步。
但事實(shí)上,在我 Review 代碼時(shí),見了很多人的代碼,并沒有按異步的規(guī)則去使用,出現(xiàn)了很多的坑。
舉個(gè)簡單的例子:
- static async Task<List<T>> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- var filteredItems = new List<T>();
- await foreach (var item in source)
- {
- if (predicate(item))
- {
- filteredItems.Add(item);
- }
- }
- return filteredItems;
- }
這樣的寫法,看著是用到了 async / await 對(duì),但實(shí)際上并沒有實(shí)現(xiàn)異步,程序依然是按照同步在運(yùn)行。換句話說,這只是一個(gè)樣子上的異步,實(shí)際沒有任何延遲執(zhí)行的效果。
1. 延遲執(zhí)行
其實(shí),這兒正確的寫法也挺簡單,用到的就是個(gè)異步的迭代器(關(guān)于異步迭代器,如果需要了解,可以看我的另一篇推文):
- static async IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- await foreach (var item in source)
- {
- if (predicate(item))
- {
- yield return item;
- }
- }
- }
這種寫法下,編譯器會(huì)將方法轉(zhuǎn)了狀態(tài)機(jī),并在實(shí)際調(diào)用時(shí),才通過枚舉器返回異步枚舉項(xiàng)。
看看調(diào)用過程:
- IAsyncEnumerable<User> users = ...
- IAsyncEnumerable<User> filteredUsers = users.Where(User => User.Name == "WangPlus");
- await foreach (User user in filteredUsers)
- {
- Console.WriteLine(user.Age);
- }
在這個(gè)調(diào)用的例子中,在 Where 時(shí),實(shí)際方法并不會(huì)馬上開始。只有在下面 foreach 時(shí),才真正開始執(zhí)行 Where 方法。
延遲執(zhí)行,這是異步 LINQ 的第一個(gè)優(yōu)勢(shì)。
2. 流執(zhí)行
流執(zhí)行,依托的也是異步迭代器。
所謂流執(zhí)行,其實(shí)就是根據(jù)調(diào)用的要求,一次返回一個(gè)對(duì)象。通過使用異步迭代器,可以不用一次返回所有的對(duì)象,而是一個(gè)一個(gè)地返回單個(gè)的對(duì)象,直到枚舉完所有的對(duì)象。
流執(zhí)行需要做個(gè)技巧性的代碼,需要用到一個(gè) C# 8.0 的新特性:局部方法。
看代碼:
- static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core()
- {
- await foreach (var item in source)
- {
- if (predicate(item))
- {
- yield return item;
- }
- }
- }
- }
3. 取消異步 LINQ
前面兩個(gè)小節(jié),寫的是異步 LINQ 的執(zhí)行。
通常使用異步 LINQ 的原因,就是因?yàn)閳?zhí)行時(shí)間長,一般需要一段時(shí)間來完成。因此,取消異步 LINQ 就很重要。想象一下,一個(gè)長的 DB 查詢已經(jīng)超時(shí)了的情況,該怎么處理?
為了支持取消,IAsyncEnumerable.GetEnumerator 本身接受一個(gè) CancellationToken 參數(shù)來中止任務(wù),并用一個(gè)擴(kuò)展方法掛接到 foreach 調(diào)用:
- CancellationToken cancellationToken = ...
- IAsyncEnumerable<User> users = ...
- IAsyncEnumerable<User> filteredUsers = users.Where(User => User.Name == "WangPlus");
- await foreach (var User in filteredUsers.WithCancellation(cancellationToken))
- {
- Console.WriteLine(User.Age);
- }
同時(shí),在上面的 Where 定義中,也要響應(yīng) CancellationToken 參數(shù):
- static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- await foreach (var item in source.WithCancellation(cancellationToken))
- {
- if (predicate(item))
- {
- yield return item;
- }
- }
- }
- }
多解釋一下:在 Where 方法中,CancellationToken 只能加到局部函數(shù) Core 中,一個(gè)簡單的原因是 Where 本身并不是異步方法,而且,我們也不希望從 Where 往里傳遞。想象一下:
- Users.Where(xxx, cancellationToken).Select(xxx, cancellationToken).OrderBy(xxx, cancellationToken);
這樣的代碼會(huì)讓人暈死。
所以,我們會(huì)采用上面的方式,允許消費(fèi)者在枚舉數(shù)據(jù)時(shí)傳遞 CancellationToken 來達(dá)到取消異步操作的目的。
4. 處理ConfigureAwait(false)
這是另一個(gè)異步必須要注意的部分,其實(shí)就是上下文。
通常大多數(shù)的方法,我們不需要關(guān)注上下文,但總有一些需要,在等待的異步操作恢復(fù)后,需要返回到某個(gè)上下文的情況。這種情況在 UI 線程編碼時(shí)通常都需要考慮。很多人提到的異步死鎖,就是這個(gè)原因。
處理也很簡單:
- static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
- {
- if (predicate(item))
- {
- yield return item;
- }
- }
- }
- }
這兒也多說兩句:按微軟的說法,await foreach 本身是基于模式的,WithCancellation 和 ConfigureAwait 返回同樣的結(jié)構(gòu)體 ConfiguredCancelableAsyncEnumerable。這個(gè)結(jié)構(gòu)體沒有實(shí)現(xiàn) IAsyncEnumerable 接口,而是做了一個(gè) GetAsyncEnumerator 方法,返回一個(gè)具有 MoveNextAsync、Current、DisposeAsync 的枚舉器,因此可以 await foreach 。
5. 方法擴(kuò)展
上面 4 個(gè)小節(jié),我們完成了一個(gè) Where 異步 LINQ 的全部內(nèi)容。
不過,這個(gè)方法有一些限制和不足。熟悉異步的兄弟們應(yīng)該已經(jīng)看出來了,里面用了一個(gè)委托 predicate 來做數(shù)據(jù)過濾,而這個(gè)委托,是個(gè)同步的方法。
事實(shí)上,根據(jù)微軟對(duì)異步 LINQ 的約定,每個(gè)操作符應(yīng)該是三種重載:
- 同步委托的實(shí)現(xiàn),就是上面的 Where 方法;
- 異步委托的實(shí)現(xiàn),這個(gè)是指具有異步返回類型的實(shí)現(xiàn),通常這種方法名稱會(huì)用一個(gè) Await 做后綴,例如:WhereAwait;
- 可以接受取消的異步委托的實(shí)現(xiàn),通常這種方法會(huì)用 AwaitWithCancellation 做后綴,例如:WhereAwaitWithCancellation。
參考微軟的異步方法,基本上都是以這種結(jié)構(gòu)來命名方法名稱的。
下面,我們也按這個(gè)方式,來做一個(gè) Where 方法的幾個(gè)重載。
WhereAwait 方法
上面說了,這會(huì)是一個(gè)異步實(shí)現(xiàn)。所以,條件部分就不能用 Func
代碼是這樣:
- static IAsyncEnumerable<T> WhereAwait<T>(this IAsyncEnumerable<T> source, Func<T, ValueTask<bool>> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
- {
- if (await predicate(item).ConfigureAwait(false))
- {
- yield return item;
- }
- }
- }
- }
調(diào)用時(shí)是這樣:
- IAsyncEnumerable<User> filteredUsers = users.WhereAwait(async user => await someIfFunction());
在上面的基礎(chǔ)上,又加了一個(gè)取消操作。
看代碼:
- static IAsyncEnumerable<T> WhereAwaitWithCancellation<T>(this IAsyncEnumerable<T> source, Func<T, CancellationToken, ValueTask<bool>> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
- {
- if (await predicate(item, cancellationToken).ConfigureAwait(false))
- {
- yield return item;
- }
- }
- }
- }
調(diào)用時(shí)是這樣:
IAsyncEnumerable
6. 總結(jié)
異步 LINQ,多數(shù)是在 LINQ 的擴(kuò)展方法中使用,而不是我們通常習(xí)慣的 LINQ 直寫。
事實(shí)上,異步 LINQ 的擴(kuò)展,對(duì) LINQ 本身是有比較大的強(qiáng)化作用的,不管從性能,還是可讀性上,用多了,只會(huì)更爽。