Python線程安全之鎖、信號量
在Realpython看到一篇關于線程安全的文章,覺得非常哇塞,分享給大家,今天先講前半部分。
提到線程必須了解兩個術語:
? 并發Concurrency,系統具備處理多個任務的能力,它們的在執行在時間上重疊,但不一定同時發生。
? 并行Parallelism:多個任務利用多核CPU真正同時執行。
Python的線程是一個并發框架,線程并行運行的時候,每個線程執行代碼的一部分,Python解釋器在它們之間切換,將執行控制權交給每個線程。
理解線程并行
先簡單舉個例子:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def threaded_function():
for number in range(3):
print(f"Printing from {threading.current_thread().name}. {number=}")
time.sleep(0.1)
with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
for _ in range(4):
executor.submit(threaded_function)
先打印輸出:
Printing from Worker_0. number=0
Printing from Worker_1. number=0
Printing from Worker_2. number=0
Printing from Worker_3. number=0
Printing from Worker_0. number=1
Printing from Worker_2. number=1
Printing from Worker_1. number=1
Printing from Worker_3. number=1
Printing from Worker_0. number=2
Printing from Worker_2. number=2
Printing from Worker_1. number=2
Printing from Worker_3. number=2
啟動了四個線程,可以觀察到在Worker_0打印number=0之后 ,并不會立刻打印number=1,原因就在于要切換給其他線程運行,而且各個線程的運行順序是不一樣的。
如何做到這個的呢?因為python解析器會進行上下文切換,默認的間隔時間如下列代碼:
import sys
sys.getswitchinterval()
0.005
5毫秒的間隔并不意味著線程會精確地每5毫秒切換一次,而是意味著解釋器會在這些間隔內考慮切換到另一個線程,而代碼中的sleep()是為了增加了在此期間發生上下文切換的可能性。
什么是線程安全
由于上下文切換,程序在多線程環境中運行時可能會表現出意外行為,這就導致線程不安全問題,而如果代碼在多線程環境中運行時表現出確定性并產生期望的輸出,那么它就被認為是線程安全的。
線程安全問題通常源于兩個原因:
? 共享可變數據:線程共享父進程的內存,因此所有變量和數據結構在各線程之間是共享的。對這些共享數據進行修改時可能會引發錯誤。
? 非原子操作:多線程環境中,涉及多個步驟的操作可能會被上下文切換中斷,尤其是在執行過程中切換到其他線程時,容易導致意外結果。
1:GIL
在討論Python線程時,Python 3.12之前的GIL(Global Interpreter Lock)不可避免要提到。GIL是一個互斥鎖,目的是保護Python對象的訪問,防止多個線程同時執行Python字節碼。它阻止了真正的線程并行,尤其在CPU密集型任務中,多線程性能會受到嚴重限制。不過,這意味著對于I/O密集型任務,線程并行仍然適用。
由于GIL的存在,當某個操作能在單個字節碼指令中完成時,它是原子的。那么,Python是否因此天然線程安全?并非如此。因為I/O操作仍然可以并行執行,因此即使有GIL,訪問共享可變數據時依然需要鎖等同步機制確保線程安全。
GIL是否完全消除了Python的多線程并發能力?并沒有,Python支持通過多進程來實現真正的并行。
值得關注的是,從Python 3.13開始,Python提供了無GIL的解釋器,實現了真正的線程并行。但無論是否有GIL,編寫代碼時始終建議合理地保護線程安全——也就是說,不依賴GIL,主動保證線程安全。
2:競爭
現在來看看第二個核心概念,競爭條件發生在程序的結果依賴于不可控事件的順序或時間,如線程執行順序時,如果沒有適當的同步會導致程序出現不可預測的錯誤。
下面的例子就來模擬這種情況,兩個線程同時修改一個屬性:
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
def withdraw(self, amount):
if self.balance >= amount:
new_balance = self.balance - amount
time.sleep(0.1)
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(account.withdraw, 500)
executor.submit(account.withdraw, 700)
print(f"Final account balance: {account.balance}")
先猜可能的結果,代碼可能會輸出:
Final account balance: 300
也可能會輸出:
Final account balance: 500
為什么會出現這樣的情況呢?這就是由于線程執行順序不一致導致的,如果先扣700,而第二個線程突然切換過來了,檢查余額夠,最終就扣了500,余額就變成500了,當然結果是錯誤的。
同步原語
為了解決線程不安全問題,Python的threading模塊提供了各種同步原語,以防止競爭條件并允許線程之間的協調。
同步原語會:
? 控制線程同時執行代碼塊
? 使多個代碼語句對線程來說是原子的
? 限制線程的并發訪問
? 在線程之間進行協調,并根據其他線程的狀態執行操作
接下去使用Python線程鎖和信號實現互斥。
使用Python線程鎖實現互斥
鎖是一種同步原語,可用于獨占訪問資源,一旦一個線程獲取了鎖,其他線程就不能再獲取它并繼續執行,直到鎖被釋放,可以使用鎖來封裝應該原子執行的語句或語句組。
python提供兩個lock相關的函數:
? 當一個線程調用.acquire()方法時,如果Lock對象已經被另一個線程鎖定,那么調用的線程會被阻塞,直到持有鎖的線程釋放鎖。
? release() 會釋放一個被線程獲取的鎖,如果嘗試釋放一個未鎖定的鎖,會引發RuntimeError。
如果使用with語句,Lock 對象可用作上下文管理器,可以自動獲取和釋放鎖。
為了解決上面代碼存在的問題,可以:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.account_lock = threading.Lock()
def withdraw(self, amount):
with self.account_lock:
if self.balance >= amount:
new_balance = self.balance - amount
print(f"Withdrawing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
def deposit(self, amount):
with self.account_lock:
new_balance = self.balance + amount
print(f"Depositing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(account.withdraw, 700)
executor.submit(account.deposit, 1000)
executor.submit(account.withdraw, 300)
print(f"Final account balance: {account.balance}")
上述代碼通過鎖成功保證了線性安全。
如果由于代碼中的錯誤或疏忽導致鎖未正確釋放,可能會導致死鎖,即線程無限期地等待鎖被釋放。
死鎖的原因包括:
? 嵌套鎖獲取:如果一個線程嘗試獲取它已經持有的鎖,可能會發生死鎖,同一線程嘗試多次獲取相同的鎖會導致線程阻塞自身,這種情況在沒有外部干預的情況下無法解決。
? 多重鎖獲取:當使用多個鎖時,如果線程以不一致的順序獲取這些鎖,可能會發生死鎖,如果兩個線程各自持有一個鎖并等待對方釋放鎖,那么兩個線程都無法繼續,從而導致死鎖。
對于多重鎖可以使用可重入鎖RLock解決,當持有線程再次請求鎖時,它不會阻塞,允許線程在釋放鎖之前多次獲取鎖,這在遞歸函數或線程需要重新進入已鎖定資源的情況下非常有用,相對來說,RLock因為要跟蹤同一線程獲取鎖的次數,會有性能開銷。
Semaphores信號量
在資源數量有限且多個線程嘗試訪問這些有限資源時非常有用,它使用一個計數器來限制多個線程對臨界區的訪問,每次調用.acquire() 都會將信號量的計數器減少一個,當計數器達到零時,再.acquire() 調用將被阻塞。
舉一個例子,多個客戶在銀行等待有限數量的柜員服務:
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)
def now():
return time.strftime("%H:%M:%S")
def serve_customer(name):
print(f"{now()}: {name} is waiting for a teller.")
with teller_semaphore:
print(f"{now()}: {name} is being served by a teller.")
time.sleep(random.randint(1, 3))
print(f"{now()}: {name} is done being served.")
customers = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]
with ThreadPoolExecutor(max_workers=3) as executor:
for customer_name in customers:
thread = executor.submit(serve_customer, customer_name)
print(f"{now()}: All customers have been served.")
代碼很好理解,當某個線程達到計數器上限后,它會被阻塞,直到其他線程在with語句中因為完成服務而釋放,但不管怎么樣,每次只有三個客戶被服務。
參考:https://realpython.com/python-thread-lock/#using-python-threading-locks-for-mutual-exclusion