深度解密協程鎖、信號量以及線程鎖的實現原理
關于什么是信號量,相信大家都知道,那么本文便從源碼的角度來看看信號量是怎么實現的。不過在說信號量之前,必須先剖析一下鎖,理解了鎖才能更好地理解信號量。
那什么是鎖呢?如果程序中某個部分在并發操作時會出現意想不到的結果(比如操作一個共享的數據結構),那么該部分就需要通過鎖保護起來,而被鎖保護起來的部分叫做臨界區。
線程在進入臨界區之前必須先獲取鎖,然后才能操作共享資源。而鎖一旦被獲取,那么其它線程再嘗試獲取鎖,就會陷入阻塞,直到鎖被釋放。
圖片
通過鎖,我們能確保同一時刻只能有一個線程操作共享資源,從而很好地解決資源競爭問題。這里的鎖指的是互斥鎖,也被稱為排它鎖。
而在 Python 里面,鎖可以通過 asyncio 和 threading 模塊來創建,這兩個模塊都提供了鎖,一個是協程鎖,一個是線程鎖,當然也包括信號量。
import asyncio
import threading
lock1 = asyncio.Lock()
lock2 = threading.Lock()
當我們對類 Lock 實例化,便可以得到鎖,然后鎖有兩個常用方法。
- acquire():獲取鎖;
- release():釋放鎖;
API 非常簡單,我們先來看看協程里面的鎖,以及信號量。
協程鎖和信號量
之前在介紹 asyncio 的 Future 和 Task 時說過,Future 對象可以看作是一個容器,它保存了在未來某個時刻才會出現的結果。
如果 Future 對象里面還沒有結果集,那么它就處于未完成狀態,否則處于已完成狀態。
import asyncio
future = asyncio.Future()
# 是否完成
print(future.done())
"""
False
"""
# 因為 future 此時還沒有結果集,所以是未完成狀態(PENDING)
# 設置結果集
future.set_result("S 老師不希望你們為了她而兩敗俱傷")
# 由于設置了結果集,所以變成已完成狀態(FINISHED)
print(future.done())
"""
True
"""
# 獲取結果
print(future.result())
"""
S 老師不希望你們為了她而兩敗俱傷
"""
問題來了,如何在 future 完成時立刻拿到結果呢?總不能一直調用 done 方法輪詢吧。
很簡單,我們可以對 future 使用 await 表達式,如果 future 內部還沒有結果集,那么 await 會處于阻塞狀態,否則不會阻塞,并且還會將值取出來。
import asyncio
async def delay(future, seconds):
await asyncio.sleep(seconds)
print("給 future 設置結果集")
future.set_result(666)
async def main():
# 創建一個 future
future = asyncio.Future()
loop = asyncio.get_running_loop()
# 創建一個任務,扔到事件循環
loop.create_task(delay(future, 3))
print("await future 會陷入阻塞,因為它內部還沒有結果集")
# 該表達式會返回 666,因為給 future 設置的結果是 666
await future
print(f"3 秒后結束阻塞,因為 delay 協程內部給 future 設置了結果集")
asyncio.run(main())
"""
await future 會陷入阻塞,因為它內部還沒有結果集
給 future 設置結果集
3 秒后結束阻塞,因為 delay 協程內部給 future 設置了結果集
"""
而協程在進入事件循環時會自動創建一個 future,并將協程和 future 組合起來得到任務,而 await 一個任務等價于 await future。當協程沒有執行完畢時會處于阻塞,而協程執行完畢時會將返回值設置在 future 中,然后 await 表達式會拿到里面的結果。
在實際編碼中,我們一般很少手動創建 Future 對象(future),但 Future 和 asyncio 的實現密切相關,其中就包括了鎖。
當協程在獲取鎖時,如果發現鎖已被獲取,那么如何陷入阻塞呢?當鎖被釋放時,它又如何解除阻塞呢?答案就是通過 future。
假設協程 1 和協程 2 都要獲取鎖,它們都會調用鎖的 acquire 方法。其中協程 1 先獲取到,那么協程 2 就會創建一個 future 并 await。由于 future 內部還沒有結果集,因此協程 2 會處于阻塞。當協程 1 釋放鎖時,會給協程 2 創建的 future 設置一個結果,從而讓協程 2 解除阻塞、獲取到鎖。
我們手動實現一下鎖。
import asyncio
from collections import deque
class Lock:
def __init__(self):
# 保存創建的 future
self._waiters = deque()
# 鎖是否已被獲取
self._locked = False
async def acquire(self):
# 如果鎖沒有被獲取,那么獲取鎖
if not self._locked:
self._locked = True
return True
# 否則說明鎖已被獲取,創建一個 future
future = asyncio.Future()
# 將它放在雙端隊列里面
self._waiters.append(future)
# 此時獲取鎖的協程就會陷入阻塞,等待其它協程喚醒
await future
# 如果解除阻塞,意味著該協程獲取到鎖了
self._locked = True
return True
def release(self):
# 釋放鎖,如果發現鎖沒被獲取,說明對鎖進行了二次釋放
if not self._locked:
raise RuntimeError("鎖沒有被獲取")
# 將鎖的狀態改成 False,表示鎖被釋放了
self._locked = False
if len(self._waiters) == 0:
return
# 從雙端隊列 deque 的左側彈出 future
# 這個 future 就是某個協程在獲取不到鎖時創建的
# 并通過 await future 讓自身陷入阻塞狀態,等待被喚醒
future = self._waiters.popleft()
# 拿到 future 之后,執行 future.set_result(),也就是設置結果集
# 那么對應的協程就會解除阻塞,從而獲取鎖
future.set_result(True)
# 注意:因為 future 是從右邊添加的,所以要從 deque 的左側彈出
# 因為先獲取鎖的協程要優先解除阻塞
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.release()
整個過程非常簡單,就是在獲取不到鎖時,創建一個 Future 對象并 await,此時就會陷入阻塞。當然獲取鎖的協程可能有很多,它們創建的 future 會保存在一個雙端隊列里面。
而拿到鎖的協程,在操作完臨界區并釋放鎖時,會從雙端隊列的左側彈出一個 future,并為其設置結果集。那么創建該 future 的協程就會解除阻塞,從而獲取到鎖。
因此這就是 asyncio 鎖的實現方式,一點都不神秘。當然 asyncio 內部還做了一些異常檢測,以及檢測 future 是否已取消等等,我們這里省略了。有興趣可以看一看 asyncio 內部鎖的實現細節,整體邏輯和我們這里基本一致,并且我們這里手動實現的鎖在大部分場景下和 asyncio 的鎖都是等效的。
然后補充一點,你在使用 asyncio 鎖的時候,一定不要以全局變量的形式創建。
import asyncio
lock = asyncio.Lock()
async def a():
async with lock:
print("協程 a 成功獲取了鎖, 并進入臨界區執行操作")
await asyncio.sleep(2)
print("協程 a 釋放了鎖")
async def b():
async with lock:
print("協程 b 成功獲取了鎖, 并進入臨界區執行操作")
await asyncio.sleep(2)
print("協程 b 釋放了鎖")
async def main():
await asyncio.gather(a(), b())
asyncio.run(main())
如果這樣做,很快會看到崩潰的發生,并報告多個事件循環的錯誤:
RuntimeError: ..... attached to a different loop
這是 asyncio 庫的一個令人困惑的地方,而且這種現象也不是鎖特有的,asyncio 中的大多數對象在創建時都會提供一個可選的 loop 參數,允許你指定要運行的事件循環。
當未提供此參數時,asyncio 嘗試獲取當前正在運行的事件循環,如果沒有,則創建一個新的事件循環。在上例中,創建鎖的同時會創建一個事件循環,因為創建鎖時還沒有事件循環。然后 asyncio.run(main()) 會創建第二個事件循環,試圖使用鎖時,這兩個獨立的事件循環就會混合在一起導致崩潰。
這種行為比較棘手,因此在 Python 3.10 中會移除 loop 參數,這種令人困惑的行為也會消失。但在 3.10 之前,在使用全局 asyncio 變量時需要認真考慮這些情況。
說完了鎖,再來說說信號量。鎖負責保證同一時刻只能有一個協程去操作臨界區,而信號量在創建時會接收一個初始值 value,可以保證同一時刻最多有 value 個協程去操作臨界區。
因此可以把鎖看成是初始值 value 等于 1 的信號量,它在源碼中的實現和鎖基本是類似的,我們也手動實現一下。
import asyncio
from collections import deque
class Semaphore:
def __init__(self, value=1):
self._waiters = deque()
# 可以把 self._value 看成是令牌的數量
# 每當一個協程進入臨界區,令牌數減 1,離開臨界區,令牌數加 1
# 如果 self._value 小于等于 0,說明令牌用光了,此時就不允許進入臨界區
self._value = value
@property
def locked(self):
return self._value <= 0
async def acquire(self):
# 如果 self._value > 0,說明可以進入臨界區
if not self.locked:
self._value -= 1 # self._value 要減 1
return True
# 如果 self._value <= 0,說明此時不能進去臨界區,必須等待某個協程從臨界區出來
# 那么和鎖一樣,也是創建一個 future 并放在雙端隊列里面
future = asyncio.Future()
self._waiters.append(future)
# 此時獲取信號量的協程會陷入阻塞
await future
# 解除阻塞,意味著該協程獲取到信號量了
self._value -= 1
return True
def release(self):
# 釋放信號量,說白了就是將 self._value 加 1
self._value += 1
if len(self._waiters) == 0:
return
future = self._waiters.popleft()
future.set_result(True)
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.release()
信號量和鎖的實現方式是一樣的,鎖可以看成是 value 為 1 的信號量。當協程進入臨界區,value 的值會減少 1,離開臨界區 value 的值會增加 1。如果 value 為 0,那么后續協程就不允許進入臨界區了,必須等到某個協程從臨界區出來。
說到這,再來補充一個有界信號量,因為信號量有一個問題。
import asyncio
from asyncio import Semaphore
import time
async def bar(sem: Semaphore):
async with sem:
await asyncio.sleep(3)
async def main():
# 每次允許兩個協程進入臨界區
sem = Semaphore(2)
# 創建 4 個任務
task = [asyncio.create_task(bar(sem)) for _ in range(4)]
# 直接對 sem 執行 release
sem.release()
sem.release()
await asyncio.gather(*task)
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(f"總耗時: {end - start}")
"""
總耗時: 3.003426834
"""
創建了 4 個任務,每次只允許兩個協程進入臨界區,因此總耗時應該是 6 秒才對。但問題是我們創建完信號量之后,調用了兩次 release 方法,將內部的 value 值增加了 2,此時信號量就變成了同時允許 4 個協程進入臨界區。
因此和鎖不一樣,鎖一旦被釋放,就不能再二次釋放。而信號量被釋放,其實就是將內部的 value 加 1,并且不會對內部的 value 進行檢測。
import asyncio
from asyncio import Semaphore
async def main():
sem = Semaphore(2)
print(f"before value: {sem._value}")
for _ in range(100):
sem.release()
print(f"after value: {sem._value}")
asyncio.run(main())
"""
before value: 2
after value: 102
"""
不過這個問題基本很少發生,當然也可以使用 async with 語句,這樣獲取和釋放一定是成對出現的。
而有界信號量在信號量的基礎上做了一層檢測,如果在 release 的時候發現 value 已經達到了初始值,那么會報錯。
圖片
有界信號量會將初始值 value 單獨保存起來,如果釋放時發現 value 大于等于初始值,那么報錯。但是注意:有界信號量依舊可以多次 release,不過我們基本不會這么干,因為獲取和釋放應該是成對出現的。
以上我們就說完了協程里面的鎖和信號量,再來看看線程提供的。
線程鎖和信號量
線程鎖可以通過 threading 模塊創建。
import threading
lock = threading.Lock()
注意:Lock 并不是一個類,而是一個函數,看一下源代碼。
Lock = _allocate_lock
# threading.Lock() 其實就是 _thread.allocate_lock()
_allocate_lock = _thread.allocate_lock
調用 _thread.allocate_lock() 時會在內部創建鎖,而鎖是由 _thread 模塊實現的。
import threading
import _thread
lock = threading.Lock()
print(type(lock))
"""
<class '_thread.lock'>
"""
lock = _thread.allocate_lock()
print(type(lock))
"""
<class '_thread.lock'>
"""
所以線程鎖其實是一個 _thread.lock 對象。
補充一下,Python 有很多的模塊是由 C 實現的,因為它們和性能密切相關,編譯之后會內嵌在解釋器里面。舉個例子:
import random, _random
import re, _sre
import ssl, _ssl
import io, _io
import bisect, _bisect
import heapq, _heapq
import asyncio, _asyncio
import threading, _thread
這些 C 實現的模塊,名字前面一般會帶有一個下滑線,它們內嵌在解釋器里面,你在 Lib 目錄下是找不到的。但我們不需要直接使用這些模塊,解釋器會提供相應的 Python 模塊對其進行封裝。
我們只需要導入 Python 模塊即可,在內部會調用具體的 C 實現,以 io 模塊為例。
圖片
這些類都是 _io 實現的,而 io 只是做了一層封裝,因此在實際編碼時會使用 C 實現的 _io 模塊里的邏輯。
再比如內置函數 open,它其實就是 io.open,而 io 里面的 open 是從 _io 導入進來的。
import io
import _io
print(open is io.open is _io.open) # True
好了,說了這么多只是想表示線程鎖的具體實現不在 threading 里面,而是在 _thread 里面。_thread 是一個 C 實現的模塊,我們需要到解釋器里面才能看到具體實現。
在 Modules/_threadmodule.c 中,有一個結構體實例 Locktype,它便是 _thread.lock 這個類的底層實現。
圖片
_thread.lock 實例化后會得到鎖,鎖在底層對應的是 lockobject 結構體。
// _threadmodule.c
typedef struct {
PyObject_HEAD
PyThread_type_lock lock_lock;
PyObject *in_weakreflist;
char locked;
} lockobject;
// pythread.h
typedef void *PyThread_type_lock;
解釋一下這個結構體。
PyObject_HEAD
每個對象都具備的頭部信息,它包含了對象的引用計數和類型。
lock_lock
PyThread_type_lock 是 void * 的類型別名,所以 lock_lock 是一個 void * 類型的指針,該指針指向了真正的鎖,這個鎖是底層操作系統提供的。
和協程鎖不同,由于操作系統感知不到協程,因此協程鎖是基于 Future 對象實現的。但線程鎖則是基于操作系統實現的,當 Python 代碼創建鎖、獲取鎖、解鎖時,會通過 lock_lock 指針將這些操作轉發到具體的鎖實現上。
in_weakreflist
用于創建弱引用,關于什么是弱引用,我在之前的文章中介紹過。
locked
用于標記鎖狀態,把它當成 Python 的布爾值即可,值為 1 表示鎖已被獲?。ㄒ焰i定),0 表示未被獲?。ㄎ存i定)。
這幾個字段應該很好理解,然后我們來看一下鎖的具體方法,那么方法都定義在哪呢?我們說過,實例對象有哪些行為,取決于類型對象定義了哪些操作。
因此鎖的操作都定義在 Locktype 里面,由內部的 tp_methods 字段負責維護。
圖片
該字段被賦值為 lock_methods,所以鎖的方法都在 lock_methods 數組中。
圖片
以上就是鎖能夠使用的方法,我們來驗證一下。
import threading
lock = threading.Lock()
# acquire_lock 和 acquire 基本是等價的
# release_lock 和 release 也基本是等價的
# 不過我們一般都會使用 acquire 和 lock
lock.acquire_lock() # 獲取鎖
lock.release_lock() # 釋放鎖
lock.acquire() # 獲取鎖
lock.release() # 釋放鎖
# 同理 locked_lock 和 locked 也是等價的
# 表示鎖是否被獲?。ㄒ焰i定),不過我們一般使用 locked
print(lock.locked_lock())
print(lock.locked())
lock.acquire()
print(lock.locked_lock())
print(lock.locked())
lock.release()
"""
False
False
True
True
"""
# 還提供了上下文管理,等價于 lock.acquire + lock.release
with lock:
pass
好了,接下來我們看看 acquire 方法,也就是鎖是怎么獲取的。
static PyObject *
lock_PyThread_acquire_lock(
lockobject *self,
PyObject *args,
PyObject *kwds
){
_PyTime_t timeout; // 超時時間
// 一個枚舉,表示鎖狀態,有三個可選值
// PY_LOCK_FAILURE:表示因為鎖已被持有,而獲取失敗
// PY_LOCK_ACQUIRED:表示鎖可用,并成功獲取鎖
// PY_LOCK_INTR:表示獲取鎖的操作被中斷,比如抵達超時時間
PyLockStatus r;
// 參數解析,該方法接收一個 timeout 參數
if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
return NULL;
// 獲取鎖,并指定一個超時時間,不傳則表示沒有超時時間
// 那么在獲取不到鎖時,會無限等待
r = acquire_timed(self->lock_lock, timeout);
// 如果返回的狀態為 PY_LOCK_INTR,說明達到超時時間
// 因此獲取鎖的操作被中斷,并且會拋出異常
if (r == PY_LOCK_INTR) {
return NULL;
}
// 如果返回的狀態為 PY_LOCK_ACQUIRED,表示鎖獲取成功
// 將鎖的 locked 字段設置為 1,表示鎖已被獲取
if (r == PY_LOCK_ACQUIRED)
self->locked = 1;
// 如果以上兩種狀態都不是,那么說明獲取失敗了
// 將 r == PY_LOCK_ACQUIRED 轉成布爾值返回
// 獲取成功返回 True,獲取失敗返回 False
return PyBool_FromLong(r == PY_LOCK_ACQUIRED);
}
整個過程仍然很簡單,因此我們看到協程鎖和線程鎖的實現是類似的,它們都有一個 locked 字段用于表示鎖是否已被獲取。
只不過協程鎖是基于 Future 對象實現的,當 await future 陷入阻塞時,表示鎖已被其它協程獲取。當解除阻塞時,代表鎖被釋放了,自己獲取到鎖。
而線程鎖是基于操作系統實現的,它本質上是對操作系統提供的鎖做了一個封裝。Python 線程在獲取鎖時,底層會獲取操作系統的鎖。
而操作系統的鎖是怎么獲取的呢?在源碼中使用的是 acquire_time 函數,它接收一個指針和一個超時時間。該指針便是 lockobject 的 lock_lock 字段,類型是 void *,它指向了操作系統提供的鎖實現。
圖片
acquire_time 函數做了一些參數處理后,又調用了 PyThread_acquire_lock_timed 函數,顯然獲取鎖的邏輯位于該函數里面。
PyThread_acquire_lock_timed 函數在不同平臺有著不同的實現,因為不同操作系統的鎖實現是不是一樣的,所以源碼中使用 void *。
圖片
我們以 Windows 系統為例:
圖片
雖然不同系統的函數實現不一樣,但參數是一致的。
- aLock:void * 指針,指向操作系統提供的鎖;
- microseconds:等待鎖的時間,以微妙為單位。如果值是負數,表示無限等待,直到獲取鎖;
- intr_flag:如果設置為 1,那么當等待過程中出現了信號中斷時,函數會提前返回。
函數的核心實現如下:
圖片
又調用了 EnterNonRecursiveMutex 函數,該函數是真正獲取鎖的邏輯,參數 aLock 指向了操作系統的互斥鎖。前面說過,不同系統有著不同的鎖實現,所以具體使用時需要轉換。在 Windows 系統上,它被轉成了 PNRMUTEX。
typedef struct _NRMUTEX
{
// 對操作系統互斥鎖的封裝
PyMUTEX_T cs;
// 對條件變量的封裝,用于線程間的同步
// 允許線程在條件不滿足時等待,條件滿足時由其它線程通知等待的線程
// 條件變量一般和互斥鎖一起使用,避免競爭條件和死鎖
PyCOND_T cv;
// 標記互斥鎖是否已被獲取,1 表示已被獲取,0 表示未被獲取
int locked;
} NRMUTEX;
typedef NRMUTEX *PNRMUTEX;
所以 lockobject 的 lock_lock 指針指向的其實依舊不是 OS 互斥鎖,而是一個結構體實例,結構體內部的字段 cs 封裝的才是 OS 互斥鎖。
圖片
lockobject 是線程鎖,也就是 Python 代碼中使用的鎖的底層實現,而 NRMUTEX 則是封裝了操作系統提供的互斥鎖。注意這里面的兩個 locked,它們都用于標記鎖是否已被獲取。
最后來看看 EnterNonRecursiveMutex 函數的具體邏輯。
DWORD
EnterNonRecursiveMutex(PNRMUTEX mutex,
DWORD milliseconds)
{
DWORD result = WAIT_OBJECT_0;
// 對 OS 互斥鎖進行鎖定,用于保護共享數據,如果鎖定失敗直接返回
if (PyMUTEX_LOCK(&mutex->cs))
return WAIT_FAILED;
// 如果鎖定成功,那么將 locked 字段設置為 1,表示互斥鎖被獲取
// 但如果發現 locked 已經為 1 了,則說明已有別的線程將 locked 修改為 1
// 那么當前線程就要等待,直到 locked 不為 1(鎖被釋放)
if (milliseconds == INFINITE) {
// 無限等待
while (mutex->locked) {
if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
result = WAIT_FAILED;
break;
}
}
} else if (milliseconds != 0) {
// 有時間限制的等待
ULONGLONG now, target = GetTickCount64() + milliseconds;
while (mutex->locked) {
if (PyCOND_TIMEDWAIT(
&mutex->cv, &mutex->cs,
(long long)milliseconds*1000) < 0)
{
result = WAIT_FAILED;
break;
}
now = GetTickCount64();
if (target <= now)
break;
milliseconds = (DWORD)(target-now);
}
}
// 在被喚醒之后,說明當前線程獲取互斥鎖成功,于是將 locked 改成 1
if (!mutex->locked) {
mutex->locked = 1;
result = WAIT_OBJECT_0;
} else if (result == WAIT_OBJECT_0)
result = WAIT_TIMEOUT;
// 這里必須將操作系統的鎖釋放掉,因為對于外界的線程而言,
// 鎖是否被獲?。ㄦi定),取決于 locked 字段是否為 1
PyMUTEX_UNLOCK(&mutex->cs);
return result;
}
代碼邏輯有一些讓人疑惑的地方,下面解釋一下。Python 里面調用 lock.acquire() 方法時,表示要獲取線程鎖。但獲取線程鎖之前,要先獲取 OS 互斥鎖,如果獲取不到,那么壓根不允許進入臨界區。
但解釋器在互斥鎖的基礎上又封裝了一層,如果獲取到了互斥鎖,還要將 locked 字段修改為 1。因為從代碼邏輯上講,無論是線程鎖還是互斥鎖,只有當它們內部的 locked 字段為 1 時,才算是獲取了鎖。
所以將互斥鎖的 locked 字段修改為 1 之后,后續還要將線程鎖的 locked 字段修改為 1,這樣才算是獲取了線程鎖。
到這里估計可能有人會產生一個疑問,為啥函數在一開始要獲取系統的互斥鎖,最后又釋放掉,這豈不是多此一舉?
if (PyMUTEX_LOCK(&mutex->cs))
return WAIT_FAILED;
//...
PyMUTEX_UNLOCK(&mutex->cs);
直接檢測 locked 字段是否等于 1 不就行了嗎?其實原因有三個:
- 保護共享狀態:操作系統的互斥鎖 mutex-> cs 用于保護共享狀態 mutex -> locked 的讀寫,在多線程環境中,任何對共享狀態的訪問都要同步,以防止競態條件;
- 條件變量的同步:在使用條件變量 mutex -> cv 時,通常需要結合互斥鎖使用,條件變量的等待和通知需要在互斥鎖的保護下進行,以保證操作的原子性;
- 避免忙等待:如果只使用 mutex -> locked 進行檢查,可能會陷入忙等待,即不斷地檢查鎖狀態而占用 CPU 資源。使用互斥鎖和條件變量可以讓線程在等待時被掛起,從而更有效地利用 CPU;
所以解釋器為 OS 互斥鎖引入了一個自定義的鎖狀態 locked,OS 互斥鎖提供了對 locked 的基本保護,因為多個線程都要修改它。而自定義的鎖狀態 locked 則用于實現同步邏輯,如果 locked 為 1,我們就認為鎖被獲取了,locked 為 0,鎖就沒有被獲取。
協程鎖和線程鎖都是如此,所謂的獲取鎖、釋放鎖都是在修改 locked 字段的值。只不過在等待的時候,協程鎖使用的是 Future 對象,而線程鎖使用的是操作系統提供的互斥鎖和條件變量。
所以上面代碼中的 PyMUTEX_LOCK 通過之后,還要檢測 locked 字段是否等于 1,代碼片段如下。
while (mutex->locked) {
if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
result = WAIT_FAILED;
break;
}
//...
如果 locked 是 1,說明互斥鎖已經被獲取了,當前線程要進行等待,直到 locked 字段的值為 0。當其它線程釋放鎖時,會將 locked 字段修改為 0,并通過條件變量喚醒當前線程。
該線程醒來后檢測到 locked 為 0,就知道互斥鎖已被釋放,自己可以獲取了,于是再將 locked 字段修改為 1。
說完了線程鎖的獲取,再來看看線程鎖的釋放,所謂釋放,其實就是將 locked 字段修改為 0 而已。
圖片
釋放互斥鎖的邏輯最終會調用如下函數:
圖片
修改 locked 是不安全的,需要加鎖保護。所以 OS 互斥鎖就是為了保護 locked 變量的修改,再配合條件變量實現阻塞等待以及自動喚醒,但從代碼邏輯上講,將 locked 字段設置為 0,才算是真正釋放了鎖。
這部分邏輯稍微有點繞,總之記住一個重點:所謂的鎖,它的核心就是結構體的一個字段,這里是 locked。如果字段的值為 1,表示鎖被獲取了,字段的值為 0,表示鎖沒有被獲取。
- 而獲取鎖,本質上就是將 locked 字段修改為 1;
- 而釋放鎖,本質上就是將 locked 字段修改為 0;
當鎖沒有被獲取時,那么線程在獲取鎖和釋放鎖時的邏輯可以簡化為如下:
圖片
但實際情況會有多個線程一起競爭鎖,因此為了保護這個共享字段,以及實現阻塞等待和自動喚醒,解釋器使用了操作系統的互斥鎖和條件變量。
小結
以上我們就剖析了協程鎖、信號量以及線程鎖的實現原理,至于線程里面的信號量,它的原理和協程的信號量是一樣的,只是實現方式不一樣。
圖片
線程的信號量包含了一個初始值 value,但它在實現阻塞等待以及喚醒的時候用的是條件變量,而條件變量的實現依賴于鎖。簡單來說,獲取信號量的時候,self._value 會減 1,釋放信號量的時候,self._value 會加 1。
當 self._value 為 0 時,獲取信號量會陷入阻塞,而當某個線程退出臨界區釋放信號量的時候,會通過條件變量的 notify 機制喚醒阻塞的線程。
關于條件變量,我們以后再分析,有點餓了。
另外進程也有鎖和信號量,這里也先不討論了,有點困了。