成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

如何精確控制 asyncio 中并發(fā)運(yùn)行的多個(gè)任務(wù)

開(kāi)發(fā) 前端
之前我們了解了如何創(chuàng)建多個(gè)任務(wù)來(lái)并發(fā)運(yùn)行程序,方式是通過(guò) asyncio.create_task 將協(xié)程包裝成任務(wù)。

之前我們了解了如何創(chuàng)建多個(gè)任務(wù)來(lái)并發(fā)運(yùn)行程序,方式是通過(guò) asyncio.create_task 將協(xié)程包裝成任務(wù),如下所示:

import asyncio, time

async def main():
    task1 = asyncio.create_task(asyncio.sleep(3))
    task2 = asyncio.create_task(asyncio.sleep(3))
    task3 = asyncio.create_task(asyncio.sleep(3))

    await task1
    await task2
    await task3

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時(shí):", end - start)
"""
總耗時(shí): 3.003109625
"""

但這種代碼編寫(xiě)方式只適用于簡(jiǎn)單情況,如果在同時(shí)發(fā)出數(shù)百、數(shù)千甚至更多 Web 請(qǐng)求的情況下,這種編寫(xiě)方式將變得冗長(zhǎng)且混亂。所以 asyncio 提供了許多便利的函數(shù),支持我們一次性等待多個(gè)任務(wù)。

等待一組任務(wù)全部完成

一個(gè)被廣泛用于等待一組任務(wù)的方式是使用 asyncio.gather,這個(gè)函數(shù)接收一系列的可等待對(duì)象,允許我們?cè)谝恍写a中同時(shí)運(yùn)行它們。如果傳入的 awaitable 對(duì)象是協(xié)程,gather 函數(shù)會(huì)自動(dòng)將其包裝成任務(wù),以確保它們可以同時(shí)運(yùn)行。這意味著不必像之前那樣,用 asyncio.create_task 單獨(dú)包裝,但即便如此,還是建議手動(dòng)包裝一下。

asyncio.gather 同樣返回一個(gè) awaitable 對(duì)象,在 await 表達(dá)式中使用它時(shí),它將暫停,直到傳遞給它的所有 awaitable 對(duì)象都完成為止。一旦所有任務(wù)都完成,asyncio.gather 將返回這些任務(wù)的結(jié)果所組成的列表。

import asyncio
import time
from aiohttp import ClientSession

async def fetch_status(session: ClientSession, url: str):
    async with session.get(url) as resp:
        return resp.status

async def main():
    async with ClientSession() as session:
        # 注意:requests 里面是 100 個(gè)協(xié)程
        # 傳遞給 asyncio.gather 之后會(huì)自動(dòng)被包裝成任務(wù)
        requests = [fetch_status(session, "http://www.baidu.com")
                    for _ in range(100)]
        # 并發(fā)運(yùn)行 100 個(gè)任務(wù),并等待這些任務(wù)全部完成
        # 相比寫(xiě) for 循環(huán)再單獨(dú) await,這種方式就簡(jiǎn)便多了
        status_codes = await asyncio.gather(*requests)
        print(f"{len(status_codes)} 個(gè)任務(wù)已全部完成")

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時(shí):", end - start)
"""
100 個(gè)任務(wù)已全部完成
總耗時(shí): 0.552532458
"""

完成 100 個(gè)請(qǐng)求只需要 0.55 秒鐘,由于網(wǎng)絡(luò)問(wèn)題,測(cè)試的結(jié)果可能不準(zhǔn)確,但異步肯定比同步要快。

另外傳給 gather 的每個(gè) awaitable 對(duì)象可能不是按照確定性順序完成的,例如將協(xié)程 a 和 b 按順序傳遞給 gather,但 b 可能會(huì)在 a 之前完成。不過(guò) gather 的一個(gè)很好的特性是,不管 awaitable 對(duì)象何時(shí)完成,都保證結(jié)果會(huì)按照傳遞它們的順序返回。

import asyncio
import time

async def main():
    # asyncio.sleep 還可以接收一個(gè) result 參數(shù),作為 await 表達(dá)式的值
    tasks = [asyncio.sleep(second, result=f"我睡了 {second} 秒")
             for second in (5, 3, 4)]
    print(await asyncio.gather(*tasks))

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時(shí):", end - start)
"""
['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒']
總耗時(shí): 5.002968417
"""

然后 gather 還可以實(shí)現(xiàn)分組,什么意思呢?

import asyncio
import time

async def main():
    gather1 = asyncio.gather(
        *[asyncio.sleep(second, result=f"我睡了 {second} 秒")
          for second in (5, 3, 4)]
    )
    gather2 = asyncio.gather(
        *[asyncio.sleep(second, result=f"我睡了 {second} 秒")
          for second in (3, 3, 3)]
    )
    results = await asyncio.gather(
        gather1, gather2, asyncio.sleep(6, "我睡了 6 秒")
    )
    print(results)


start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時(shí):", end - start)
"""
[['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒'], 
 ['我睡了 3 秒', '我睡了 3 秒', '我睡了 3 秒'], 
 '我睡了 6 秒']
總耗時(shí): 6.002826208
"""

asyncio.gather 里面可以通過(guò)繼續(xù)接收 asyncio.gather 返回的對(duì)象,從而實(shí)現(xiàn)分組功能,還是比較強(qiáng)大的。

如果 gather 里面啥都不傳的話,那么會(huì)返回一個(gè)空列表。

問(wèn)題來(lái)了,在上面的例子中,我們假設(shè)所有請(qǐng)求都不會(huì)失敗或拋出異常,這是理想情況。但如果請(qǐng)求失敗了呢?我們來(lái)看一下,當(dāng) gather 里面的任務(wù)出現(xiàn)異常時(shí)會(huì)發(fā)生什么?

import asyncio

async def normal_running():
    await asyncio.sleep(3)
    return "正常運(yùn)行"

async def raise_error():
    raise ValueError("出錯(cuò)啦")

async def main():
    results = await asyncio.gather(normal_running(), raise_error())
    print(results)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
Traceback (most recent call last):
    ......
    raise ValueError("出錯(cuò)啦")
ValueError: 出錯(cuò)啦
"""

我們看到拋異常了,其實(shí) gather 函數(shù)的原理就是等待一組任務(wù)運(yùn)行完畢,當(dāng)某個(gè)任務(wù)完成時(shí),就調(diào)用它的 result 方法,拿到返回值。但我們之前介紹 Future 和 Task 的時(shí)候說(shuō)過(guò),如果出錯(cuò)了,調(diào)用 result 方法會(huì)將異常拋出來(lái)。

import asyncio

async def normal_running():
    await asyncio.sleep(3)
    return "正常運(yùn)行"

async def raise_error():
    raise ValueError("出錯(cuò)啦")

async def main():
    try:
        await asyncio.gather(normal_running(), raise_error())
    except Exception:
        print("執(zhí)行時(shí)出現(xiàn)了異常")
    # 但是剩余的任務(wù)仍在執(zhí)行,拿到當(dāng)前的所有正在執(zhí)行的任務(wù)
    all_tasks = asyncio.all_tasks()
    # task 相當(dāng)于對(duì)協(xié)程做了一個(gè)封裝,那么通過(guò) get_coro 方法也可以拿到對(duì)應(yīng)的協(xié)程
    print(f"當(dāng)前剩余的任務(wù):", [task.get_coro().__name__ for task in all_tasks])
    # 繼續(xù)等待剩余的任務(wù)完成
    results = await asyncio.gather(
        *[task for task in all_tasks if task.get_coro().__name__ != "main"]
    )
    print(results)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
執(zhí)行時(shí)出現(xiàn)了異常
當(dāng)前剩余的任務(wù): ['main', 'normal_running']
['正常運(yùn)行']
"""

可以看到在 await asyncio.gather() 的時(shí)候,raise_error() 協(xié)程拋異常了,那么異常會(huì)向上傳播,在 main() 里面 await 處產(chǎn)生 ValueError。我們捕獲之后查看剩余未完成的任務(wù),顯然只剩下 normal_running() 和 main(),因?yàn)槿蝿?wù)執(zhí)行出現(xiàn)異常也代表它完成了。

需要注意的是,一個(gè)任務(wù)出現(xiàn)了異常,并不影響剩余未完成的任務(wù),它們?nèi)栽诤笈_(tái)運(yùn)行。我們舉個(gè)例子證明這一點(diǎn):

import asyncio, time

async def normal_running():
    await asyncio.sleep(5)
    return "正常運(yùn)行"

async def raise_error():
    await asyncio.sleep(3)
    raise ValueError("出錯(cuò)啦")

async def main():
    try:
        await asyncio.gather(normal_running(), raise_error())
    except Exception:
        print("執(zhí)行時(shí)出現(xiàn)了異常")
    # raise_error() 會(huì)在 3 秒后拋異常,然后向上拋,被這里捕獲
    # 而 normal_running() 不會(huì)受到影響,它仍然在后臺(tái)運(yùn)行
    # 顯然接下來(lái)它只需要再過(guò) 2 秒就能運(yùn)行完畢
    time.sleep(2)  # 注意:此處會(huì)阻塞整個(gè)線程
    # asyncio.sleep 是不耗費(fèi) CPU 的,因此即使 time.sleep 將整個(gè)線程阻塞了,也不影響
    # 因?yàn)閳?zhí)行 time.sleep 時(shí),normal_running() 里面的 await asyncio.sleep(5) 已經(jīng)開(kāi)始執(zhí)行了
    results = await asyncio.gather(*[task for task in asyncio.all_tasks()
                                     if task.get_coro().__name__ != "main"])
    print(results)

loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print("總耗時(shí):", end - start)
"""
執(zhí)行時(shí)出現(xiàn)了異常
['正常運(yùn)行']
總耗時(shí): 5.004949666
"""

這里耗時(shí)是 5 秒,說(shuō)明一個(gè)任務(wù)拋異常不會(huì)影響其它任務(wù),因?yàn)?time.sleep(2) 執(zhí)行完畢之后,normal_running() 里面 asyncio.sleep(5) 也已經(jīng)執(zhí)行完畢了,說(shuō)明異常捕獲之后,剩余的任務(wù)沒(méi)有受到影響。

并且這里我們使用了 time.sleep,在工作中千萬(wàn)不要這么做,因?yàn)樗鼤?huì)阻塞整個(gè)線程,導(dǎo)致主線程無(wú)法再做其他事情了。而這里之所以用 time.sleep,主要是想說(shuō)明一個(gè)任務(wù)出錯(cuò),那么將異常捕獲之后,其它任務(wù)不會(huì)受到影響。

那么問(wèn)題來(lái)了,如果發(fā)生異常,我不希望它將異常向上拋該怎么辦呢?可能有人覺(jué)得這還不簡(jiǎn)單,直接來(lái)一個(gè)異常捕獲不就行了?這是一個(gè)解決辦法,但 asyncio.gather 提供了一個(gè)參數(shù),可以更優(yōu)雅的實(shí)現(xiàn)這一點(diǎn)。

import asyncio

async def normal_running():
    await asyncio.sleep(3)
    return "正常運(yùn)行"

async def raise_error():
    raise ValueError("出錯(cuò)啦")

async def main():
    results = await asyncio.gather(
        normal_running(), raise_error(),
        return_exceptions=True
    )
    print(results)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
['正常運(yùn)行', ValueError('出錯(cuò)啦')]
"""

之前在介紹任務(wù)的時(shí)候我們說(shuō)了,不管正常執(zhí)行結(jié)束還是出錯(cuò),都代表任務(wù)已完成,會(huì)將結(jié)果和異常都收集起來(lái),只不過(guò)其中肯定有一個(gè)為 None。然后根據(jù)不同的情況,選擇是否將異常拋出來(lái)。所以在 asyncio 里面,異常只是一個(gè)普通的屬性,會(huì)保存在任務(wù)對(duì)象里面。

對(duì)于 asyncio.gather 也是同理,它里面有一個(gè) return_exceptions 參數(shù),默認(rèn)為 False,當(dāng)任務(wù)出現(xiàn)異常時(shí),會(huì)拋給 await 所在的位置。如果該參數(shù)設(shè)置為 True,那么出現(xiàn)異常時(shí),會(huì)直接把異常本身返回(此時(shí)任務(wù)也算是結(jié)束了)。

在 asyncio 里面,異常變成了一個(gè)可控的屬性。因?yàn)閳?zhí)行是以任務(wù)為單位的,當(dāng)出現(xiàn)異常時(shí),也會(huì)作為任務(wù)的一個(gè)普通的屬性。我們可以選擇將它拋出來(lái),也可以選擇隱式處理掉。

至于我們要判斷哪些任務(wù)是正常執(zhí)行,哪些任務(wù)是拋了異常,便可以通過(guò)返回值來(lái)判斷。如果 isinstance(res, Exception) 為 True,那么證明任務(wù)出現(xiàn)了異常,否則正常執(zhí)行。雖然這有點(diǎn)笨拙,但也能湊合用,因?yàn)?API 并不完美。

當(dāng)然以上這些都不能算是缺點(diǎn),gather 真正的缺點(diǎn)有兩個(gè):

  • 如果我希望所有任務(wù)都執(zhí)行成功,要是有一個(gè)任務(wù)失敗,其它任務(wù)自動(dòng)取消,該怎么實(shí)現(xiàn)呢?比如發(fā)送 Web 請(qǐng)求,如果一個(gè)請(qǐng)求失敗,其他所有請(qǐng)求也會(huì)失敗(要取消請(qǐng)求以釋放資源)。顯然要做到這一點(diǎn)不容易,因?yàn)閰f(xié)程被包裝在后臺(tái)的任務(wù)中;
  • 其次,必須等待所有任務(wù)執(zhí)行完成,才能處理結(jié)果,如果想要在結(jié)果完成后立即處理它們,這就存在問(wèn)題。例如有一個(gè)請(qǐng)求需要 100 毫秒,而另一個(gè)請(qǐng)求需要 20 秒,那么在處理 100 毫秒完成的那個(gè)請(qǐng)求之前,我們將等待 20 秒。

而 asyncio 也提供了用于解決這兩個(gè)問(wèn)題的 API。

在任務(wù)完成時(shí)立即處理

如果想在某個(gè)結(jié)果生成之后就對(duì)其進(jìn)行處理,這是一個(gè)問(wèn)題;如果有一些可以快速完成的等待對(duì)象,和一些可能需要很長(zhǎng)時(shí)間完成的等待對(duì)象,這也可能是一個(gè)問(wèn)題。因?yàn)? gather 需要等待所有對(duì)象執(zhí)行完畢,這就導(dǎo)致應(yīng)用程序可能變得無(wú)法響應(yīng)。

想象一個(gè)用戶發(fā)出 100 個(gè)請(qǐng)求,其中兩個(gè)很慢,但其余的都很快完成。如果一旦有請(qǐng)求完成,可以向用戶輸出一些信息,來(lái)提升用戶的使用體驗(yàn)。

為處理這種情況,asyncio 公開(kāi)了一個(gè)名為 as_completed 的 API 函數(shù),這個(gè)函數(shù)接收一個(gè)可等待對(duì)象(awaitable)組成的列表,并返回一個(gè)生成器。通過(guò)遍歷,等待它們中的每一個(gè)對(duì)象都完成,并且哪個(gè)先完成,哪個(gè)就先被迭代。這意味著將能在結(jié)果可用時(shí)立即就處理它們,但很明顯此時(shí)就沒(méi)有所謂的順序了,因?yàn)闊o(wú)法保證哪些請(qǐng)求先完成。

import asyncio
import time

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    # asyncio 提供的用于等待一組 awaitable 對(duì)象的 API 都很智能
    # 如果檢測(cè)到你傳遞的是協(xié)程,那么會(huì)自動(dòng)包裝成任務(wù)
    # 不過(guò)還是建議手動(dòng)包裝一下
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in (3, 5, 2, 4, 6, 1)]
    for finished in asyncio.as_completed(tasks):
        print(await finished)

loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print("總耗時(shí):", end - start)
"""
我睡了 1 秒
我睡了 2 秒
我睡了 3 秒
我睡了 4 秒
我睡了 5 秒
我睡了 6 秒
總耗時(shí): 6.000872417
"""

和 gather 不同,gather 是等待一組任務(wù)全部完成之后才返回,并且會(huì)自動(dòng)將結(jié)果取出來(lái),結(jié)果值的順序和添加任務(wù)的順序是一致的。對(duì)于 as_completed 而言,它會(huì)返回一個(gè)生成器,我們遍歷它,哪個(gè)任務(wù)先完成則哪個(gè)就先被處理。

那么問(wèn)題來(lái)了,如果出現(xiàn)異常了該怎么辦?很簡(jiǎn)單,直接異常捕獲即可。

然后我們?cè)賮?lái)思考一個(gè)問(wèn)題,任何基于 Web 的請(qǐng)求都存在花費(fèi)很長(zhǎng)時(shí)間的風(fēng)險(xiǎn),服務(wù)器可能處于過(guò)重的資源負(fù)載下,或者網(wǎng)絡(luò)連接可能很差。

之前我們看到了通過(guò) wait_for 函數(shù)可以為特定請(qǐng)求添加超時(shí),但如果想為一組請(qǐng)求設(shè)置超時(shí)怎么辦?as_completed 函數(shù)通過(guò)提供一個(gè)可選的 timeout 參數(shù)來(lái)處理這種情況,它允許以秒為單位指定超時(shí)時(shí)間。如果花費(fèi)的時(shí)間超過(guò)設(shè)定的時(shí)間,那么迭代器中的每個(gè)可等待對(duì)象都會(huì)在等待時(shí)拋出 TimeoutException。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in (1, 5, 6)]
    for finished in asyncio.as_completed(tasks, timeout=3):
        try:
            print(await finished)
        except asyncio.TimeoutError:
            print("超時(shí)啦")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
超時(shí)啦
超時(shí)啦
"""

as_completed 非常適合用于盡快獲得結(jié)果,但它也有缺點(diǎn)。

第一個(gè)缺點(diǎn)是沒(méi)有任何方法可快速了解我們正在等待哪個(gè)協(xié)程或任務(wù),因?yàn)檫\(yùn)行順序是完全不確定的。如果不關(guān)心順序,這可能沒(méi)問(wèn)題,但如果需要以某種方式將結(jié)果與請(qǐng)求相關(guān)聯(lián),那么將面臨挑戰(zhàn)。

第二個(gè)缺點(diǎn)是超時(shí),雖然會(huì)正確地拋出異常并繼續(xù)運(yùn)行程序,但創(chuàng)建的所有任務(wù)仍將在后臺(tái)運(yùn)行。如果想取消它們,很難確定哪些任務(wù)仍在運(yùn)行,這是我們面臨的另一個(gè)挑戰(zhàn)。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in (1, 5, 6)]
    for finished in asyncio.as_completed(tasks, timeout=3):
        try:
            print(await finished)
        except asyncio.TimeoutError:
            print("超時(shí)啦")

    # tasks[1] 還需要 2 秒運(yùn)行完畢,tasks[2] 還需要 3 秒運(yùn)行完畢
    print(tasks[1].done(), tasks[2].done())

    await asyncio.sleep(2)
    # 此時(shí)只剩下 tasks[2],還需要 1 秒運(yùn)行完畢
    print(tasks[1].done(), tasks[2].done())

    await asyncio.sleep(1)
    # tasks[2] 也運(yùn)行完畢
    print(tasks[1].done(), tasks[2].done())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
超時(shí)啦
超時(shí)啦
False False
True False
True True
"""

根據(jù)輸出結(jié)果可以發(fā)現(xiàn),雖然因?yàn)榈诌_(dá)超時(shí)時(shí)間, await 會(huì)導(dǎo)致 TimeoutError,但未完成的任務(wù)不會(huì)受到影響,它們?nèi)匀辉诤笈_(tái)執(zhí)行。

但這對(duì)于我們來(lái)說(shuō),有時(shí)卻不是一件好事,因?yàn)槲覀兿M绻诌_(dá)超時(shí)時(shí)間,那么未完成的任務(wù)就別在執(zhí)行了,這時(shí)候如何快速找到那些未完成的任務(wù)呢?為處理這種情況,asyncio 提供了另一個(gè) API 函數(shù):wait。

使用 wait 進(jìn)行細(xì)粒度控制

gather 和 as_completed 的缺點(diǎn)之一是,當(dāng)我們看到異常時(shí),沒(méi)有簡(jiǎn)單的方法可以取消已經(jīng)在運(yùn)行的任務(wù)。這在很多情況下可能沒(méi)問(wèn)題,但是想象一個(gè)場(chǎng)景:同時(shí)發(fā)送大批量 Web 請(qǐng)求(參數(shù)格式是相同的),如果某個(gè)請(qǐng)求的參數(shù)格式錯(cuò)誤(說(shuō)明所有請(qǐng)求的參數(shù)格式都錯(cuò)了),那么剩余的請(qǐng)求還有必要執(zhí)行嗎?顯然是沒(méi)有必要的,而且還會(huì)消耗更多資源。另外 as_completed 的另一個(gè)缺點(diǎn)是,由于迭代順序是不確定的,因此很難準(zhǔn)確跟蹤已完成的任務(wù)。

于是 asyncio 提供了 wait 函數(shù),注意它和 wait_for 的區(qū)別,wait_for 針對(duì)的是單個(gè)任務(wù),而 wait 則針對(duì)一組任務(wù)(不限數(shù)量)。

注:wait 函數(shù)接收的是一組 awaitable 對(duì)象,但未來(lái)的版本改為僅接收任務(wù)對(duì)象。因此對(duì)于 gather、as_completed、wait 等函數(shù),雖然它們會(huì)自動(dòng)包裝成任務(wù),但我們更建議先手動(dòng)包裝成任務(wù),然后再傳過(guò)去。

并且 wait 和 as_completed 接收的都是任務(wù)列表,而 gather 則要求將列表打散,以多個(gè)位置參數(shù)的方式傳遞,因此這些 API 的參數(shù)格式不要搞混了。

然后是 wait 函數(shù)的返回值,它會(huì)返回兩個(gè)集合:一個(gè)由已完成的任務(wù)(執(zhí)行結(jié)束或出現(xiàn)異常)組成的集合,另一個(gè)由未完成的任務(wù)組成的集合。而 wait 函數(shù)的參數(shù),它除了可以接收一個(gè)任務(wù)列表之外,還可以接收一個(gè) timeout(超時(shí)時(shí)間)和一個(gè) return_when(用于控制返回條件)。光說(shuō)很容易亂,我們來(lái)實(shí)際演示一下。

等待所有任務(wù)完成

如果未指定 retun_when,則此選項(xiàng)使用默認(rèn)值,并且它的行為與 asyncio.gather 最接近,但也存在一些差異。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds)) for seconds in (3, 2, 4)]
    # 和 gather 一樣,默認(rèn)會(huì)等待所有任務(wù)都完成
    done, pending = await asyncio.wait(tasks)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

    for done_task in done:
        print(await done_task)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 3
未完成的任務(wù)數(shù): 0
我睡了 2 秒
我睡了 4 秒
我睡了 3 秒
"""

await asynio.wait 時(shí),會(huì)返回兩個(gè)集合,分別保存已完成的任務(wù)和仍然運(yùn)行的任務(wù)。并且由于返回的是集合,所以是無(wú)序的。默認(rèn)情況下,asyncio.wait 會(huì)等到所有任務(wù)都完成后才返回,所以待處理集合的長(zhǎng)度為 0。

然后還是要說(shuō)一下異常,如果某個(gè)任務(wù)執(zhí)行時(shí)出現(xiàn)異常了該怎么辦呢?

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯(cuò)了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds)) for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 5
未完成的任務(wù)數(shù): 0
Task exception was never retrieved
future: <Task finished ... coro=<delay() done, defined at .../main.py:3> 
         exception=ValueError('我出錯(cuò)了(second is 3)')>
    ......
    raise ValueError("我出錯(cuò)了(second is 3)")
ValueError: 我出錯(cuò)了(second is 3)
"""

對(duì)于 asyncio.gather 而言,如果某個(gè)任務(wù)出現(xiàn)異常,那么異常會(huì)向上拋給 await 所在的位置。如果不希望它拋,那么可以將 gather 里面的 return_exceptions 參數(shù)指定為 True,這樣當(dāng)出現(xiàn)異常時(shí),會(huì)將異常返回。

而 asyncio.wait 也是如此,如果任務(wù)出現(xiàn)異常了,那么會(huì)直接視為已完成,異常同樣不會(huì)向上拋。但是從程序開(kāi)發(fā)的角度來(lái)講,返回值可以不要,但異常不能不處理。

所以當(dāng)任務(wù)執(zhí)行出錯(cuò)時(shí),雖然異常不會(huì)向上拋,但 asyncio 會(huì)將它打印出來(lái),于是就有了:Task exception was never retrieved。意思就是該任務(wù)出現(xiàn)異常了,但你沒(méi)有處理它。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯(cuò)了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds)) for seconds in range(1, 6)]
    # done 里面保存的都是已完成的任務(wù)
    done, pending = await asyncio.wait(tasks)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

    # 所以我們直接遍歷 done 即可
    for done_task in done:
        # 這里不能使用 await done_task,因?yàn)楫?dāng)任務(wù)完成時(shí),它就等價(jià)于 done_task.result()
        # 而任務(wù)出現(xiàn)異常時(shí),調(diào)用 result() 是會(huì)將異常拋出來(lái)的,所以我們需要先檢測(cè)異常是否為空
        exc = done_task.exception()
        if exc:
            print(exc)
        else:
            print(done_task.result())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 5
未完成的任務(wù)數(shù): 0
我睡了 5 秒
我睡了 2 秒
我出錯(cuò)了(second is 3)
我睡了 4 秒
我睡了 1 秒
"""

這里調(diào)用 result 和 exception 有一個(gè)前提,就是任務(wù)必須處于已完成狀態(tài),否則會(huì)拋異常:InvalidStateError: Result is not ready.。但對(duì)于我們當(dāng)前是沒(méi)有問(wèn)題的,因?yàn)?done 里面的都是已完成的任務(wù)。

這里能再次看到和 gather 的區(qū)別,gather 會(huì)幫你把返回值都取出來(lái),放在一個(gè)列表中,并且順序就是任務(wù)添加的順序。而 wait 返回的是集合,集合里面是任務(wù),我們需要手動(dòng)拿到返回值。

某個(gè)完成出現(xiàn)異常時(shí)取消其它任務(wù)

從目前來(lái)講,wait 的作用和 gather 沒(méi)有太大的區(qū)別,都是等到任務(wù)全部結(jié)束再解除等待(出現(xiàn)異常也視作任務(wù)完成,并且其它任務(wù)不受影響)。那如果我希望當(dāng)有任務(wù)出現(xiàn)異常時(shí),立即取消其它任務(wù)該怎么做呢?顯然這就依賴 wait 函數(shù)里面的 return_when,它有三個(gè)可選值:

  • asyncio.ALL_COMPLETED:等待所有任務(wù)完成后返回;
  • asyncio.FIRST_COMPLETED:有一個(gè)任務(wù)完成就返回;
  • asyncio.FIRST_EXCEPTION:當(dāng)有任務(wù)出現(xiàn)異常時(shí)返回;

顯然為完成這個(gè)需求,我們應(yīng)該將 return_when 指定為 FIRST_EXCEPTION。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯(cuò)了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds), name=f"睡了 {seconds} 秒的任務(wù)")
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

    print("都有哪些任務(wù)完成了?")
    for t in done:
        print("    " + t.get_name())

    print("還有哪些任務(wù)沒(méi)完成?")
    for t in pending:
        print("    " + t.get_name())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 3
未完成的任務(wù)數(shù): 2
都有哪些任務(wù)完成了?
    睡了 2 秒的任務(wù)
    睡了 3 秒的任務(wù)
    睡了 1 秒的任務(wù)
還有哪些任務(wù)沒(méi)完成?
    睡了 4 秒的任務(wù)
    睡了 5 秒的任務(wù)
"""

當(dāng) delay(3) 失敗時(shí),顯然 delay(1)、delay(2) 已完成,而 delay(4) 和 delay(5) 未完成。此時(shí)集合 done 里面的就是已完成的任務(wù),pending 里面則是未完成的任務(wù)。

當(dāng) wait 返回時(shí),未完成的任務(wù)仍在后臺(tái)繼續(xù)運(yùn)行,如果我們希望將剩余未完成的任務(wù)取消掉,那么直接遍歷 pending 集合即可。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯(cuò)了(second is 3)")
    print(f"我睡了 {seconds} 秒")

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")
    # 此時(shí)未完成的任務(wù)仍然在后臺(tái)運(yùn)行,這時(shí)候我們可以將它們?nèi)∠?    for t in pending:
        t.cancel()
    # 阻塞 3 秒
    await asyncio.sleep(3)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
我睡了 2 秒
已完成的任務(wù)數(shù): 3
未完成的任務(wù)數(shù): 2
"""

在 await asyncio.sleep(3) 的時(shí)候,剩余兩個(gè)任務(wù)并沒(méi)有輸出,所以任務(wù)確實(shí)被取消了。注:出現(xiàn)異常的任務(wù)會(huì)被掛在已完成集合里面,如果沒(méi)有任務(wù)在執(zhí)行時(shí)出現(xiàn)異常,那么效果等價(jià)于 ALL_COMPLETED。

當(dāng)任務(wù)完成時(shí)處理結(jié)果

ALL_COMPLETED 和 FIRST_EXCEPTION 都有一個(gè)缺點(diǎn),在任務(wù)成功且不拋出異常的情況下,必須等待所有任務(wù)完成。對(duì)于之前的用例,這可能是可以接受的,但如果想要在某個(gè)協(xié)程成功完成后立即處理結(jié)果,那么現(xiàn)在的情況將不能滿足我們的需求。

雖然這個(gè)場(chǎng)景可使用 as_completed 實(shí)現(xiàn),但 as_completed 的問(wèn)題是沒(méi)有簡(jiǎn)單的方法可以查看哪些任務(wù)還在運(yùn)行,哪些任務(wù)已經(jīng)完成。因?yàn)楸闅v的時(shí)候,我們無(wú)法得知哪個(gè)任務(wù)先完成,所以 as_completed 無(wú)法完成我們的需求。

好在 wait 函數(shù)的 return_when 參數(shù)可以接收 FIRST_COMPLETED 選項(xiàng),表示只要有一個(gè)任務(wù)完成就立即返回,而返回的可以是執(zhí)行出錯(cuò)的任務(wù),也可以是成功運(yùn)行的任務(wù)(任務(wù)失敗也表示已完成)。然后,我們可以取消其他正在運(yùn)行的任務(wù),或者讓某些任務(wù)繼續(xù)運(yùn)行,具體取決于用例。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯(cuò)了(second is 3)")
    print(f"我睡了 {seconds} 秒")

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
已完成的任務(wù)數(shù): 1
未完成的任務(wù)數(shù): 4
"""

當(dāng) return_when 參數(shù)為 FIRST_COMPLETED 時(shí),那么只要有一個(gè)任務(wù)完成就會(huì)立即返回,然后我們處理完成的任務(wù)即可。至于剩余的任務(wù),它們?nèi)栽诤笈_(tái)運(yùn)行,我們可以繼續(xù)對(duì)其使用 wait 函數(shù)。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯(cuò)了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    while True:
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for t in done:
            exc = t.exception()
            print(exc) if exc else print(t.result())

        if pending:  # 還有未完成的任務(wù),那么繼續(xù)使用 wait
            tasks = pending
        else:
            break

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
我睡了 2 秒
我出錯(cuò)了(second is 3)
我睡了 4 秒
我睡了 5 秒
"""

整個(gè)行為和 as_completed 是一致的,但這種做法有一個(gè)好處,就是我們每一步都可以準(zhǔn)確地知曉哪些任務(wù)已經(jīng)完成,哪些任務(wù)仍然運(yùn)行,并且也可以做到精確取消指定任務(wù)。

處理超時(shí)

除了允許對(duì)如何等待協(xié)程完成進(jìn)行更細(xì)粒度的控制外,wait 還允許設(shè)置超時(shí),以指定我們希望等待完成的時(shí)間。要啟用此功能,可將 timeout 參數(shù)設(shè)置為所需的最大秒數(shù),如果超過(guò)了這個(gè)超時(shí)時(shí)間,wait 將立即返回 done 和 pending 任務(wù)集。

不過(guò)與目前所看到的 wait_for 和 as_completed 相比,超時(shí)在 wait 中的行為方式存在一些差異。

1)協(xié)程不會(huì)被取消。

當(dāng)使用 wait_for 時(shí),如果任務(wù)超時(shí),則引發(fā) TimeouError,并且任務(wù)也會(huì)自動(dòng)取消。但使用 wait 的情況并非如此,它的行為更接近我們?cè)? as_completed 中看到的情況。如果想因?yàn)槌瑫r(shí)而取消協(xié)程,必須顯式地遍歷任務(wù)并取消,否則它們?nèi)栽诤笈_(tái)運(yùn)行。

2)不會(huì)引發(fā)超時(shí)錯(cuò)誤。

如果發(fā)生超時(shí),則 wait 返回所有已完成的任務(wù),以及在發(fā)生超時(shí)的時(shí)候仍處于運(yùn)行狀態(tài)的所有任務(wù)。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, timeout=3.1)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 3
未完成的任務(wù)數(shù): 2
"""

wait 調(diào)用將在 3 秒后返回 done 和 pending 集合,在 done 集合中,會(huì)有三個(gè)已完成的任務(wù)。而耗時(shí) 4 秒和 5 秒的任務(wù),由于仍在運(yùn)行,因此它們將出現(xiàn)在 pending 集合中。我們可以繼續(xù)等待它們完成并提取返回值,也可以將它們?nèi)∠簟?/p>

需要注意:和之前一樣,pending 集合中的任務(wù)不會(huì)被取消,并且繼續(xù)運(yùn)行,盡管會(huì)超時(shí)。對(duì)于要終止待處理任務(wù)的情況,我們需要顯式地遍歷 pending 集合并在每個(gè)任務(wù)上調(diào)用 cancel。

為什么要先將協(xié)程包裝成任務(wù)

我們說(shuō)協(xié)程在傳給 wait 的時(shí)候會(huì)自動(dòng)包裝成任務(wù),那為什么我們還要手動(dòng)包裝呢?

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, timeout=3.1)
    print(all(map(lambda t: t in tasks, done)))
    print(all(map(lambda t: t in tasks, pending)))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
True
True
"""

如果 wait 函數(shù)接收的就是任務(wù),那么 wait 函數(shù)就不會(huì)再包裝了,所以 done 和 pending 里面的任務(wù)和 tasks 里面的任務(wù)是相同的。基于這個(gè)條件,我們后續(xù)可以做一些比較之類的。

比如有很多 Web 請(qǐng)求任務(wù),但如果當(dāng)未完成的任務(wù)是 task1、task2、task3,那么就取消掉,于是可以這么做。

for t in pending:
    if t in (task1, task2, task3):
        t.cancel()

如果返回的 done 和 pending 里的任務(wù),是在 wait 函數(shù)中自動(dòng)創(chuàng)建的,那么我們就無(wú)法進(jìn)行任何比較來(lái)查看 pending 集合中的特定任務(wù)。

小結(jié)

1)asyncio.gather 函數(shù)允許同時(shí)運(yùn)行多個(gè)任務(wù),并等待它們完成。一旦傳遞給它的所有任務(wù)全部完成,這個(gè)函數(shù)就會(huì)返回。由于 gather 會(huì)拿到里面每個(gè)任務(wù)的返回值,所以它要求每個(gè)任務(wù)都是成功的,如果有任務(wù)執(zhí)行出錯(cuò)(沒(méi)有返回值),那么獲取返回值的時(shí)候就會(huì)將異常拋出來(lái),然后向上傳遞給 await asyncio.gather。

為此,可以將 return_exceptions 設(shè)置為 True,這將返回成功完成的可等待對(duì)象的結(jié)果,以及產(chǎn)生的異常(異常會(huì)作為一個(gè)普通的屬性返回,和返回值是等價(jià)的)。

2)可使用 as_completed 函數(shù)在可等待對(duì)象列表完成時(shí)處理它們的結(jié)果,它會(huì)返回一個(gè)可以循環(huán)遍歷的生成器。一旦某個(gè)協(xié)程或任務(wù)完成,就能訪問(wèn)結(jié)果并處理它。

3)如果希望同時(shí)運(yùn)行多個(gè)任務(wù),并且還希望能了解哪些任務(wù)已經(jīng)完成,哪些任務(wù)在運(yùn)行,則可以使用 wait。這個(gè)函數(shù)還允許在返回結(jié)果時(shí)進(jìn)行更多控制,返回時(shí),我們會(huì)得到一組已經(jīng)完成的任務(wù)和一組仍在運(yùn)行的任務(wù)。

然后可以取消任何想要取消的任務(wù),或執(zhí)行其他任何需要執(zhí)行的任務(wù)。并且 wait 里面的任務(wù)出現(xiàn)異常,也不會(huì)影響其它任務(wù),異常會(huì)作為任務(wù)的一個(gè)屬性,只是在我們沒(méi)有處理的時(shí)候會(huì)給出警告。至于具體的處理方式,我們直接通過(guò) exception 方法判斷是否發(fā)生了異常即可,沒(méi)有異常返回 result(),有異常返回 exception()。

責(zé)任編輯:華軒 來(lái)源: 古明地覺(jué)的編程教室
相關(guān)推薦

2023-04-26 11:59:06

Swift異步編程

2022-04-26 08:41:38

Swift并發(fā)系統(tǒng)iOS

2020-02-21 08:00:00

Pythonasyncio編程語(yǔ)言

2021-04-07 06:00:18

JavaScript 前端并發(fā)控制

2009-02-09 10:06:03

并發(fā)控制Web應(yīng)用悲觀鎖

2017-11-06 17:16:55

Linux設(shè)備驅(qū)動(dòng)并發(fā)控制

2017-08-02 15:00:12

PythonAsyncio異步編程

2017-05-05 08:44:24

PythonAsyncio異步編程

2024-11-27 13:25:24

Rust線程池線程

2009-07-03 12:59:40

Servlet配置

2011-08-30 10:20:41

Silverlight

2021-01-12 10:22:45

JavaScript并發(fā)控制前端

2024-04-30 10:29:46

前端開(kāi)發(fā)h5開(kāi)發(fā)函數(shù)

2025-03-21 09:01:34

Swift任務(wù)取消機(jī)制協(xié)作式取消

2021-05-12 22:07:43

并發(fā)編排任務(wù)

2021-08-01 15:26:59

協(xié)程Asyncio并發(fā)數(shù)

2023-07-14 15:10:17

PythonAsyncIO庫(kù)

2024-03-04 00:02:00

Redis存儲(chǔ)令牌

2024-01-18 08:37:33

socketasyncio線程

2021-05-13 21:58:00

高并發(fā)應(yīng)用Asyncio
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 黄色一级大片在线观看 | 性高朝久久久久久久3小时 av一区二区三区四区 | 二区久久 | 国产一区二区在线视频 | 国产日韩精品在线 | 97日日碰人人模人人澡分享吧 | 91视频88av| 免费日韩av | 久久逼逼| 美日韩免费视频 | 一区二区三区四区在线视频 | 久久综合狠狠综合久久综合88 | 九九热在线免费观看 | 污片在线观看 | 精品免费国产视频 | 久久一区视频 | 国产日产欧产精品精品推荐蛮挑 | 久久成人精品视频 | 98久久| www.日韩高清 | 久久久精品国产 | 国产精品久久久久久久久久久久冷 | 国产一区二区免费电影 | 隔壁老王国产在线精品 | 日韩欧美在线一区 | 成人在线精品 | www国产精 | 国产我和子的乱视频网站 | 91亚洲国产成人精品一区二三 | 国产a级黄色录像 | 凹凸日日摸日日碰夜夜 | 精品一区二区电影 | 亚洲a视频| 成人三级视频 | 国产精品一区在线 | 一级做a毛片 | 欧美日韩在线一区二区 | 亚洲最大的成人网 | 国产一区二区欧美 | yeyeav| 成人在线视频免费观看 |