如何在Python 的線程中運行協程
在一篇文章理解Python異步編程的基本原理這篇文章中,我們講到,如果在異步代碼里面又包含了一段非常耗時的同步代碼,異步代碼就會被卡住。
那么有沒有辦法讓同步代碼與異步代碼看起來也是同時運行的呢?方法就是使用事件循環的.run_in_executor()方法。
我們來看一下 Python 官方文檔[1]中的說法:
那么怎么使用呢?還是以非常耗時的遞歸方式計算斐波那契數列的這個函數為例:
- def sync_calc_fib(n):
- if n in [1, 2]:
- return1
- return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)
- async def calc_fib(n):
- result = sync_calc_fib(n)
- print(f'第 {n} 項計算完成,結果是:{result}')
- return result
我們現在需要用 aiohttp 訪問一個延遲5秒的網頁,同時計算斐波那契數列第36項。
首先我們看看單獨計算第36項需要5秒鐘:
我們再來看看如果直接把這計算斐波那契數列和請求網站的兩個異步任務放在一起“并行”,實際時間是兩個任務的時間疊加:
具體原因我在上一篇文章里面已經做了說明。
現在,我想讓兩個任務“同時運行”,于是就可以這樣修改代碼:
- import aiohttp
- import asyncio
- import time
- from concurrent.futures import ThreadPoolExecutor
- async def request(sleep_time):
- async with aiohttp.ClientSession() as client:
- resp = await client.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
- resp_json = await resp.json()
- print(resp_json)
- def sync_calc_fib(n):
- if n in [1, 2]:
- return 1
- return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)
- def calc_fib(n):
- result = sync_calc_fib(n)
- print(f'第 {n} 項計算完成,結果是:{result}')
- return result
- async def main():
- start = time.perf_counter()
- loop = asyncio.get_event_loop()
- with ThreadPoolExecutor(max_workers=4) as executor:
- tasks_list = [
- loop.run_in_executor(executor, calc_fib, 36),
- asyncio.create_task(request(5))
- ]
- await asyncio.gather(*tasks_list)
- end = time.perf_counter()
- print(f'總計耗時:{end - start}')
- asyncio.run(main())
運行效果如下圖所示:
在5秒鐘的時間,就把計算斐波那契數列和請求5秒延遲的網站都做完了。
實現這樣的轉變,關鍵的代碼就是:loop.run_in_executor(executor, calc_fib, 36)
其中的 loop就是主線程的事件循環(event loop),它是用來調度同一個線程里面的多個協程。
executor是我們使用ThreadPoolExecutor(max_workers=4)創建的一個有4個線程的線程池,calc_fib是一個耗時的同步函數,36是傳入calc_fib的參數。loop.run_in_executor(executor, calc_fib, 36)的意思是說:
- 把calc_fib函數放到線程池里面去運行
- 給線程池增加一個回調函數,這個回調函數會在運行結束后的下一次事件循環把結果保存下來。
請注意上圖中紅色箭頭對應的calc_fib這是一個同步函數,請與上一篇文章中的異步函數區分開。run_in_executor的第二個參數需要是一個同步函數的函數名。
在上面的例子中,我們創建的是有4個線程的線程池。所以這個線程池最多允許4個阻塞式的同步函數“并行”。