一日一技:等待多個線程同時結束的兩種方法
我們在寫多線程代碼的時候,可能會需要等待多個線程同時結束,然后再進行后續的流程。例如,我做了一個聚合搜索引擎,用戶輸入一個關鍵詞,我需要同時在很多個搜索引擎上搜索,然后把搜索結果匯總以后返回給用戶。
示例代碼如下:
- @app.get('/api/search')
- def search(keyword: str):
- google_result = requests.get('Google 搜索地址').text
- baidu_result = requests.get('百度搜索地址').text
- bing_result = requests.get('Bing搜索地址').text
- result = combine(google_result, baidu_result, bing_result)
- return {'success': True, 'result': result}
從上面這段代碼,大家可能會發現一個問題,就是在請求多個搜索引擎的時候是串行的,先訪問 Google,訪問完成再訪問百度,訪問完成最后訪問 Bing。這樣顯然會浪費大量的時間。
如果你不會async/await,那么為了解決這個問題,你能想到的顯然就是使用多線程。使用3個線程同時訪問 Google、百度和 Bing,然后把結果匯總傳入combine函數,不就解決問題了嗎?
如果僅僅是啟動多個線程,那么做法很簡單:
- import threading
- def get_url(url):
- result = requests.get(url, headers=HEADERS).text
- return result
- @app.get('/api/search')
- def search(keyword: str):
- google_thead = threading.Thread(target=get_url, 'Google 搜索地址')
- baidu_thread = threading.Thread(target=get_url, '百度搜索地址')
- bing_thread = threading.Thread(target=get_url, 'Bing搜索地址')
- google_thread.start()
- baidu_thread.start()
- bing_thread.start()
- ...
現在問題來了,三個線程確實已經啟動了,但你怎么知道到什么時候為止,所有線程都運行完畢?
這里我們給出幾個方法。
使用 join
調用線程的.join()方法,就可以卡住主線程,直到這個子線程運行完畢才能讓主線程繼續運行后面的代碼。所以我們可以修改代碼為:
- import threading
- def get_url(url):
- result = requests.get(url, headers=HEADERS).text
- return result
- @app.get('/api/search')
- def search(keyword: str):
- google_thead = threading.Thread(target=get_url, 'Google 搜索地址')
- baidu_thread = threading.Thread(target=get_url, '百度搜索地址')
- bing_thread = threading.Thread(target=get_url, 'Bing搜索地址')
- google_thread.start()
- baidu_thread.start()
- bing_thread.start()
- google_thread.join()
- baidu_thread.join()
- bing_thread.join()
但等一等,我怎么拿到子線程的返回呢?在默認情況下,你確實拿不到返回的數據。所以你需要傳入一個東西去子線程接收結果。所以代碼可以改為:
- import threading
- def get_url(url, output):
- result = requests.get(url, headers=HEADERS).text
- output.append(result)
- @app.get('/api/search')
- def search(keyword: str):
- result = []
- google_thead = threading.Thread(target=get_url, args=['Google 搜索地址', result])
- baidu_thread = threading.Thread(target=get_url, args=['百度搜索地址', result])
- bing_thread = threading.Thread(target=get_url, args=['Bing搜索地址', result])
- google_thread.start()
- baidu_thread.start()
- bing_thread.start()
- google_thread.join()
- baidu_thread.join()
- bing_thread.join()
- combine(*result)
因為線程是共享內存的,所以他們可以直接修改主線程傳入的列表。
在使用.join()的時候,需要小心不要把.join()放錯了地方,否則你的多線程就會變成單線程。詳情可以看我的這篇文章: 等一等,你的多線程可別再亂 join 了。
ThreadPoolExecutor
Python 自帶了一個concurrent模塊,它就是專門用來處理并發問題的。我們也可以使用這個模塊中的ThreadPoolExecutor來解決問題:
- from concurrent.futures import ThreadPoolExecutor, as_completed
- def get_url(url):
- result = requests.get(url, headers=HEADERS).text
- return result
- @app.get('/api/search')
- def search(keyword: str):
- tasks = []
- with ThreadPoolExecutor() as executor:
- for url in ['Google 搜索地址', '百度搜索地址', 'Bing搜索地址']
- task = executor.submit(get_url, url)
- tasks.append(task)
- result = [x.result() for x in as_completed(tasks)]
- combine(*result)
- ...
concurrent.futures里面的as_completed函數接收一個列表,列表里面是多個并發任務。當所有并發任務都運行結束時,它才會返回一個可迭代對象。對它進行迭代以后,每個元素的.result()就是每個子線程運行的返回結果。
其他方法
除了上面兩個方法外,還可以使用multiprocessing.dummy里面的Pool來實現更簡單的多線程。
本文轉載自微信公眾號「未聞Code」,可以通過以下二維碼關注。轉載本文請聯系未聞Code公眾號。