成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

深度解密協程鎖、信號量以及線程鎖的實現原理

開發 前端
實際情況會有多個線程一起競爭鎖,因此為了保護這個共享字段,以及實現阻塞等待和自動喚醒,解釋器使用了操作系統的互斥鎖和條件變量。

關于什么是信號量,相信大家都知道,那么本文便從源碼的角度來看看信號量是怎么實現的。不過在說信號量之前,必須先剖析一下鎖,理解了鎖才能更好地理解信號量。

那什么是鎖呢?如果程序中某個部分在并發操作時會出現意想不到的結果(比如操作一個共享的數據結構),那么該部分就需要通過鎖保護起來,而被鎖保護起來的部分叫做臨界區。

線程在進入臨界區之前必須先獲取鎖,然后才能操作共享資源。而鎖一旦被獲取,那么其它線程再嘗試獲取鎖,就會陷入阻塞,直到鎖被釋放。

圖片圖片

通過鎖,我們能確保同一時刻只能有一個線程操作共享資源,從而很好地解決資源競爭問題。這里的鎖指的是互斥鎖,也被稱為排它鎖。

而在 Python 里面,鎖可以通過 asyncio 和 threading 模塊來創建,這兩個模塊都提供了鎖,一個是協程鎖,一個是線程鎖,當然也包括信號量。

import asyncio
import threading

lock1 = asyncio.Lock()
lock2 = threading.Lock()

當我們對類 Lock 實例化,便可以得到鎖,然后鎖有兩個常用方法。

  • acquire():獲取鎖;
  • release():釋放鎖;

API 非常簡單,我們先來看看協程里面的鎖,以及信號量。

協程鎖和信號量

之前在介紹 asyncio 的 Future 和 Task 時說過,Future 對象可以看作是一個容器,它保存了在未來某個時刻才會出現的結果。

如果 Future 對象里面還沒有結果集,那么它就處于未完成狀態,否則處于已完成狀態。

import asyncio

future = asyncio.Future()
# 是否完成
print(future.done())
"""
False
"""
# 因為 future 此時還沒有結果集,所以是未完成狀態(PENDING)
# 設置結果集
future.set_result("S 老師不希望你們為了她而兩敗俱傷")
# 由于設置了結果集,所以變成已完成狀態(FINISHED)
print(future.done())
"""
True
"""
# 獲取結果
print(future.result())
"""
S 老師不希望你們為了她而兩敗俱傷
"""

問題來了,如何在 future 完成時立刻拿到結果呢?總不能一直調用 done 方法輪詢吧。

很簡單,我們可以對 future 使用 await 表達式,如果 future 內部還沒有結果集,那么 await 會處于阻塞狀態,否則不會阻塞,并且還會將值取出來。

import asyncio

async def delay(future, seconds):
    await asyncio.sleep(seconds)
    print("給 future 設置結果集")
    future.set_result(666)

async def main():
    # 創建一個 future
    future = asyncio.Future()
    loop = asyncio.get_running_loop()
    # 創建一個任務,扔到事件循環
    loop.create_task(delay(future, 3))
    print("await future 會陷入阻塞,因為它內部還沒有結果集")
    # 該表達式會返回 666,因為給 future 設置的結果是 666
    await future
    print(f"3 秒后結束阻塞,因為 delay 協程內部給 future 設置了結果集")

asyncio.run(main())
"""
await future 會陷入阻塞,因為它內部還沒有結果集
給 future 設置結果集
3 秒后結束阻塞,因為 delay 協程內部給 future 設置了結果集
"""

而協程在進入事件循環時會自動創建一個 future,并將協程和 future 組合起來得到任務,而 await 一個任務等價于 await future。當協程沒有執行完畢時會處于阻塞,而協程執行完畢時會將返回值設置在 future 中,然后 await 表達式會拿到里面的結果。

在實際編碼中,我們一般很少手動創建 Future 對象(future),但 Future 和 asyncio 的實現密切相關,其中就包括了鎖。

當協程在獲取鎖時,如果發現鎖已被獲取,那么如何陷入阻塞呢?當鎖被釋放時,它又如何解除阻塞呢?答案就是通過 future。

假設協程 1 和協程 2 都要獲取鎖,它們都會調用鎖的 acquire 方法。其中協程 1 先獲取到,那么協程 2 就會創建一個 future 并 await。由于 future 內部還沒有結果集,因此協程 2 會處于阻塞。當協程 1 釋放鎖時,會給協程 2 創建的 future 設置一個結果,從而讓協程 2 解除阻塞、獲取到鎖。

我們手動實現一下鎖。

import asyncio
from collections import deque

class Lock:

    def __init__(self):
        # 保存創建的 future
        self._waiters = deque()
        # 鎖是否已被獲取
        self._locked = False

    async def acquire(self):
        # 如果鎖沒有被獲取,那么獲取鎖
        if not self._locked:
            self._locked = True
            return True
        # 否則說明鎖已被獲取,創建一個 future
        future = asyncio.Future()
        # 將它放在雙端隊列里面
        self._waiters.append(future)
        # 此時獲取鎖的協程就會陷入阻塞,等待其它協程喚醒
        await future
        # 如果解除阻塞,意味著該協程獲取到鎖了
        self._locked = True
        return True

    def release(self):
        # 釋放鎖,如果發現鎖沒被獲取,說明對鎖進行了二次釋放
        if not self._locked:
            raise RuntimeError("鎖沒有被獲取")
        # 將鎖的狀態改成 False,表示鎖被釋放了
        self._locked = False
        if len(self._waiters) == 0:
            return
        # 從雙端隊列 deque 的左側彈出 future
        # 這個 future 就是某個協程在獲取不到鎖時創建的
        # 并通過 await future 讓自身陷入阻塞狀態,等待被喚醒
        future = self._waiters.popleft()
        # 拿到 future 之后,執行 future.set_result(),也就是設置結果集
        # 那么對應的協程就會解除阻塞,從而獲取鎖
        future.set_result(True)
        # 注意:因為 future 是從右邊添加的,所以要從 deque 的左側彈出
        # 因為先獲取鎖的協程要優先解除阻塞

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.release()

整個過程非常簡單,就是在獲取不到鎖時,創建一個 Future 對象并 await,此時就會陷入阻塞。當然獲取鎖的協程可能有很多,它們創建的 future 會保存在一個雙端隊列里面。

而拿到鎖的協程,在操作完臨界區并釋放鎖時,會從雙端隊列的左側彈出一個 future,并為其設置結果集。那么創建該 future 的協程就會解除阻塞,從而獲取到鎖。

因此這就是 asyncio 鎖的實現方式,一點都不神秘。當然 asyncio 內部還做了一些異常檢測,以及檢測 future 是否已取消等等,我們這里省略了。有興趣可以看一看 asyncio 內部鎖的實現細節,整體邏輯和我們這里基本一致,并且我們這里手動實現的鎖在大部分場景下和 asyncio 的鎖都是等效的。

然后補充一點,你在使用 asyncio 鎖的時候,一定不要以全局變量的形式創建。

import asyncio

lock = asyncio.Lock()

async def a():
    async with lock:
        print("協程 a 成功獲取了鎖, 并進入臨界區執行操作")
        await asyncio.sleep(2)
    print("協程 a 釋放了鎖")

async def b():
    async with lock:
        print("協程 b 成功獲取了鎖, 并進入臨界區執行操作")
        await asyncio.sleep(2)
    print("協程 b 釋放了鎖")

async def main():
    await asyncio.gather(a(), b())

asyncio.run(main())

如果這樣做,很快會看到崩潰的發生,并報告多個事件循環的錯誤:

RuntimeError: ..... attached to a different loop

這是 asyncio 庫的一個令人困惑的地方,而且這種現象也不是鎖特有的,asyncio 中的大多數對象在創建時都會提供一個可選的 loop 參數,允許你指定要運行的事件循環。

當未提供此參數時,asyncio 嘗試獲取當前正在運行的事件循環,如果沒有,則創建一個新的事件循環。在上例中,創建鎖的同時會創建一個事件循環,因為創建鎖時還沒有事件循環。然后 asyncio.run(main()) 會創建第二個事件循環,試圖使用鎖時,這兩個獨立的事件循環就會混合在一起導致崩潰。

這種行為比較棘手,因此在 Python 3.10 中會移除 loop 參數,這種令人困惑的行為也會消失。但在 3.10 之前,在使用全局 asyncio 變量時需要認真考慮這些情況。

說完了鎖,再來說說信號量。鎖負責保證同一時刻只能有一個協程去操作臨界區,而信號量在創建時會接收一個初始值 value,可以保證同一時刻最多有 value 個協程去操作臨界區。

因此可以把鎖看成是初始值 value 等于 1 的信號量,它在源碼中的實現和鎖基本是類似的,我們也手動實現一下。

import asyncio
from collections import deque

class Semaphore:

    def __init__(self, value=1):
        self._waiters = deque()
        # 可以把 self._value 看成是令牌的數量
        # 每當一個協程進入臨界區,令牌數減 1,離開臨界區,令牌數加 1
        # 如果 self._value 小于等于 0,說明令牌用光了,此時就不允許進入臨界區
        self._value = value

    @property
    def locked(self):
        return self._value <= 0

    async def acquire(self):
        # 如果 self._value > 0,說明可以進入臨界區
        if not self.locked:
            self._value -= 1  # self._value 要減 1
            return True
        # 如果 self._value <= 0,說明此時不能進去臨界區,必須等待某個協程從臨界區出來
        # 那么和鎖一樣,也是創建一個 future 并放在雙端隊列里面
        future = asyncio.Future()
        self._waiters.append(future)
        # 此時獲取信號量的協程會陷入阻塞
        await future
        # 解除阻塞,意味著該協程獲取到信號量了
        self._value -= 1
        return True

    def release(self):
        # 釋放信號量,說白了就是將 self._value 加 1
        self._value += 1
        if len(self._waiters) == 0:
            return
        future = self._waiters.popleft()
        future.set_result(True)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.release()

信號量和鎖的實現方式是一樣的,鎖可以看成是 value 為 1 的信號量。當協程進入臨界區,value 的值會減少 1,離開臨界區 value 的值會增加 1。如果 value 為 0,那么后續協程就不允許進入臨界區了,必須等到某個協程從臨界區出來。

說到這,再來補充一個有界信號量,因為信號量有一個問題。

import asyncio
from asyncio import Semaphore
import time

async def bar(sem: Semaphore):
    async with sem:
        await asyncio.sleep(3)

async def main():
    # 每次允許兩個協程進入臨界區
    sem = Semaphore(2)
    # 創建 4 個任務
    task = [asyncio.create_task(bar(sem)) for _ in range(4)]
    # 直接對 sem 執行 release
    sem.release()
    sem.release()
    await asyncio.gather(*task)

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(f"總耗時: {end - start}")
"""
總耗時: 3.003426834
"""

創建了 4 個任務,每次只允許兩個協程進入臨界區,因此總耗時應該是 6 秒才對。但問題是我們創建完信號量之后,調用了兩次 release 方法,將內部的 value 值增加了 2,此時信號量就變成了同時允許 4 個協程進入臨界區。

因此和鎖不一樣,鎖一旦被釋放,就不能再二次釋放。而信號量被釋放,其實就是將內部的 value 加 1,并且不會對內部的 value 進行檢測。

import asyncio
from asyncio import Semaphore

async def main():
    sem = Semaphore(2)
    print(f"before value: {sem._value}")
    for _ in range(100):
        sem.release()
    print(f"after value: {sem._value}")

asyncio.run(main())
"""
before value: 2
after value: 102
"""

不過這個問題基本很少發生,當然也可以使用 async with 語句,這樣獲取和釋放一定是成對出現的。

而有界信號量在信號量的基礎上做了一層檢測,如果在 release 的時候發現 value 已經達到了初始值,那么會報錯。

圖片圖片

有界信號量會將初始值 value 單獨保存起來,如果釋放時發現 value 大于等于初始值,那么報錯。但是注意:有界信號量依舊可以多次 release,不過我們基本不會這么干,因為獲取和釋放應該是成對出現的。

以上我們就說完了協程里面的鎖和信號量,再來看看線程提供的。

線程鎖和信號量

線程鎖可以通過 threading 模塊創建。

import threading

lock = threading.Lock()

注意:Lock 并不是一個類,而是一個函數,看一下源代碼。

Lock = _allocate_lock
# threading.Lock() 其實就是 _thread.allocate_lock()
_allocate_lock = _thread.allocate_lock

調用 _thread.allocate_lock() 時會在內部創建鎖,而鎖是由 _thread 模塊實現的。

import threading
import _thread

lock = threading.Lock()
print(type(lock))
"""
<class '_thread.lock'>
"""
lock = _thread.allocate_lock()
print(type(lock))
"""
<class '_thread.lock'>
"""

所以線程鎖其實是一個 _thread.lock 對象。

補充一下,Python 有很多的模塊是由 C 實現的,因為它們和性能密切相關,編譯之后會內嵌在解釋器里面。舉個例子:

import random, _random
import re, _sre
import ssl, _ssl
import io, _io
import bisect, _bisect
import heapq, _heapq
import asyncio, _asyncio
import threading, _thread

這些 C 實現的模塊,名字前面一般會帶有一個下滑線,它們內嵌在解釋器里面,你在 Lib 目錄下是找不到的。但我們不需要直接使用這些模塊,解釋器會提供相應的 Python 模塊對其進行封裝。

我們只需要導入 Python 模塊即可,在內部會調用具體的 C 實現,以 io 模塊為例。

圖片圖片

這些類都是 _io 實現的,而 io 只是做了一層封裝,因此在實際編碼時會使用 C 實現的 _io 模塊里的邏輯。

再比如內置函數 open,它其實就是 io.open,而 io 里面的 open 是從 _io 導入進來的。

import io
import _io

print(open is io.open is _io.open)  # True

好了,說了這么多只是想表示線程鎖的具體實現不在 threading 里面,而是在 _thread 里面。_thread 是一個 C 實現的模塊,我們需要到解釋器里面才能看到具體實現。

在 Modules/_threadmodule.c 中,有一個結構體實例 Locktype,它便是 _thread.lock 這個類的底層實現。

圖片圖片

_thread.lock 實例化后會得到鎖,鎖在底層對應的是 lockobject 結構體。

// _threadmodule.c
typedef struct {
    PyObject_HEAD
    PyThread_type_lock lock_lock;
    PyObject *in_weakreflist;
    char locked;
} lockobject;
// pythread.h
typedef void *PyThread_type_lock;

解釋一下這個結構體。

PyObject_HEAD

每個對象都具備的頭部信息,它包含了對象的引用計數和類型。

lock_lock

PyThread_type_lock 是 void * 的類型別名,所以 lock_lock 是一個 void * 類型的指針,該指針指向了真正的鎖,這個鎖是底層操作系統提供的。

和協程鎖不同,由于操作系統感知不到協程,因此協程鎖是基于 Future 對象實現的。但線程鎖則是基于操作系統實現的,當 Python 代碼創建鎖、獲取鎖、解鎖時,會通過 lock_lock 指針將這些操作轉發到具體的鎖實現上。

in_weakreflist

用于創建弱引用,關于什么是弱引用,我在之前的文章中介紹過。

locked

用于標記鎖狀態,把它當成 Python 的布爾值即可,值為 1 表示鎖已被獲?。ㄒ焰i定),0 表示未被獲?。ㄎ存i定)。

這幾個字段應該很好理解,然后我們來看一下鎖的具體方法,那么方法都定義在哪呢?我們說過,實例對象有哪些行為,取決于類型對象定義了哪些操作。

因此鎖的操作都定義在 Locktype 里面,由內部的 tp_methods 字段負責維護。

圖片圖片

該字段被賦值為 lock_methods,所以鎖的方法都在 lock_methods 數組中。

圖片圖片

以上就是鎖能夠使用的方法,我們來驗證一下。

import threading

lock = threading.Lock()

# acquire_lock 和 acquire 基本是等價的
# release_lock 和 release 也基本是等價的
# 不過我們一般都會使用 acquire 和 lock
lock.acquire_lock()  # 獲取鎖
lock.release_lock()  # 釋放鎖

lock.acquire()  # 獲取鎖
lock.release()  # 釋放鎖

# 同理 locked_lock 和 locked 也是等價的
# 表示鎖是否被獲?。ㄒ焰i定),不過我們一般使用 locked
print(lock.locked_lock())
print(lock.locked())
lock.acquire()
print(lock.locked_lock())
print(lock.locked())
lock.release()
"""
False
False
True
True
"""
# 還提供了上下文管理,等價于 lock.acquire + lock.release
with lock:
    pass

好了,接下來我們看看 acquire 方法,也就是鎖是怎么獲取的。

static PyObject *
lock_PyThread_acquire_lock(
    lockobject *self, 
    PyObject *args, 
    PyObject *kwds
){
    _PyTime_t timeout;  // 超時時間
    // 一個枚舉,表示鎖狀態,有三個可選值
    // PY_LOCK_FAILURE:表示因為鎖已被持有,而獲取失敗
    // PY_LOCK_ACQUIRED:表示鎖可用,并成功獲取鎖
    // PY_LOCK_INTR:表示獲取鎖的操作被中斷,比如抵達超時時間
    PyLockStatus r;
    
    // 參數解析,該方法接收一個 timeout 參數
    if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
        return NULL;
    
    // 獲取鎖,并指定一個超時時間,不傳則表示沒有超時時間
    // 那么在獲取不到鎖時,會無限等待
    r = acquire_timed(self->lock_lock, timeout);
    // 如果返回的狀態為 PY_LOCK_INTR,說明達到超時時間
    // 因此獲取鎖的操作被中斷,并且會拋出異常
    if (r == PY_LOCK_INTR) {
        return NULL;
    }
    // 如果返回的狀態為 PY_LOCK_ACQUIRED,表示鎖獲取成功
    // 將鎖的 locked 字段設置為 1,表示鎖已被獲取
    if (r == PY_LOCK_ACQUIRED)
        self->locked = 1;
    // 如果以上兩種狀態都不是,那么說明獲取失敗了
    // 將 r == PY_LOCK_ACQUIRED 轉成布爾值返回
    // 獲取成功返回 True,獲取失敗返回 False
    return PyBool_FromLong(r == PY_LOCK_ACQUIRED);
}

整個過程仍然很簡單,因此我們看到協程鎖和線程鎖的實現是類似的,它們都有一個 locked 字段用于表示鎖是否已被獲取。

只不過協程鎖是基于 Future 對象實現的,當 await future 陷入阻塞時,表示鎖已被其它協程獲取。當解除阻塞時,代表鎖被釋放了,自己獲取到鎖。

而線程鎖是基于操作系統實現的,它本質上是對操作系統提供的鎖做了一個封裝。Python 線程在獲取鎖時,底層會獲取操作系統的鎖。

而操作系統的鎖是怎么獲取的呢?在源碼中使用的是 acquire_time 函數,它接收一個指針和一個超時時間。該指針便是 lockobject 的 lock_lock 字段,類型是 void *,它指向了操作系統提供的鎖實現。

圖片圖片

acquire_time 函數做了一些參數處理后,又調用了 PyThread_acquire_lock_timed  函數,顯然獲取鎖的邏輯位于該函數里面。

PyThread_acquire_lock_timed 函數在不同平臺有著不同的實現,因為不同操作系統的鎖實現是不是一樣的,所以源碼中使用 void *。

圖片圖片

我們以 Windows 系統為例:

圖片圖片

雖然不同系統的函數實現不一樣,但參數是一致的。

  • aLock:void * 指針,指向操作系統提供的鎖;
  • microseconds:等待鎖的時間,以微妙為單位。如果值是負數,表示無限等待,直到獲取鎖;
  • intr_flag:如果設置為 1,那么當等待過程中出現了信號中斷時,函數會提前返回。

函數的核心實現如下:

圖片圖片

又調用了 EnterNonRecursiveMutex 函數,該函數是真正獲取鎖的邏輯,參數 aLock 指向了操作系統的互斥鎖。前面說過,不同系統有著不同的鎖實現,所以具體使用時需要轉換。在 Windows 系統上,它被轉成了 PNRMUTEX。

typedef struct _NRMUTEX
{   
    // 對操作系統互斥鎖的封裝
    PyMUTEX_T cs;
    // 對條件變量的封裝,用于線程間的同步
    // 允許線程在條件不滿足時等待,條件滿足時由其它線程通知等待的線程
    // 條件變量一般和互斥鎖一起使用,避免競爭條件和死鎖
    PyCOND_T cv;
    // 標記互斥鎖是否已被獲取,1 表示已被獲取,0 表示未被獲取
    int locked;
} NRMUTEX;
typedef NRMUTEX *PNRMUTEX;

所以 lockobject 的 lock_lock 指針指向的其實依舊不是 OS 互斥鎖,而是一個結構體實例,結構體內部的字段 cs 封裝的才是 OS 互斥鎖。

圖片圖片

lockobject 是線程鎖,也就是 Python 代碼中使用的鎖的底層實現,而 NRMUTEX 則是封裝了操作系統提供的互斥鎖。注意這里面的兩個 locked,它們都用于標記鎖是否已被獲取。

最后來看看 EnterNonRecursiveMutex 函數的具體邏輯。

DWORD
EnterNonRecursiveMutex(PNRMUTEX mutex, 
                       DWORD milliseconds)
{
    
    DWORD result = WAIT_OBJECT_0;
    // 對 OS 互斥鎖進行鎖定,用于保護共享數據,如果鎖定失敗直接返回
    if (PyMUTEX_LOCK(&mutex->cs))
        return WAIT_FAILED;
    // 如果鎖定成功,那么將 locked 字段設置為 1,表示互斥鎖被獲取
    // 但如果發現 locked 已經為 1 了,則說明已有別的線程將 locked 修改為 1
    // 那么當前線程就要等待,直到 locked 不為 1(鎖被釋放)
    if (milliseconds == INFINITE) {
        // 無限等待
        while (mutex->locked) {
            if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
                result = WAIT_FAILED;
                break;
            }
        }
    } else if (milliseconds != 0) {
        // 有時間限制的等待
        ULONGLONG now, target = GetTickCount64() + milliseconds;
        while (mutex->locked) {
            if (PyCOND_TIMEDWAIT(
                &mutex->cv, &mutex->cs, 
                (long long)milliseconds*1000) < 0) 
            {
                result = WAIT_FAILED;
                break;
            }
            now = GetTickCount64();
            if (target <= now)
                break;
            milliseconds = (DWORD)(target-now);
        }
    }
    // 在被喚醒之后,說明當前線程獲取互斥鎖成功,于是將 locked 改成 1
    if (!mutex->locked) {
        mutex->locked = 1;
        result = WAIT_OBJECT_0;
    } else if (result == WAIT_OBJECT_0)
        result = WAIT_TIMEOUT;
    // 這里必須將操作系統的鎖釋放掉,因為對于外界的線程而言,
    // 鎖是否被獲?。ㄦi定),取決于 locked 字段是否為 1
    PyMUTEX_UNLOCK(&mutex->cs); 
    return result;
}

代碼邏輯有一些讓人疑惑的地方,下面解釋一下。Python 里面調用 lock.acquire() 方法時,表示要獲取線程鎖。但獲取線程鎖之前,要先獲取 OS 互斥鎖,如果獲取不到,那么壓根不允許進入臨界區。

但解釋器在互斥鎖的基礎上又封裝了一層,如果獲取到了互斥鎖,還要將 locked 字段修改為 1。因為從代碼邏輯上講,無論是線程鎖還是互斥鎖,只有當它們內部的 locked 字段為 1 時,才算是獲取了鎖。

所以將互斥鎖的 locked 字段修改為 1 之后,后續還要將線程鎖的 locked 字段修改為 1,這樣才算是獲取了線程鎖。

到這里估計可能有人會產生一個疑問,為啥函數在一開始要獲取系統的互斥鎖,最后又釋放掉,這豈不是多此一舉?

if (PyMUTEX_LOCK(&mutex->cs))
        return WAIT_FAILED;
    //...
    PyMUTEX_UNLOCK(&mutex->cs);

直接檢測 locked 字段是否等于 1 不就行了嗎?其實原因有三個:

  • 保護共享狀態:操作系統的互斥鎖 mutex-> cs 用于保護共享狀態 mutex -> locked 的讀寫,在多線程環境中,任何對共享狀態的訪問都要同步,以防止競態條件;
  • 條件變量的同步:在使用條件變量 mutex -> cv 時,通常需要結合互斥鎖使用,條件變量的等待和通知需要在互斥鎖的保護下進行,以保證操作的原子性;
  • 避免忙等待:如果只使用 mutex -> locked 進行檢查,可能會陷入忙等待,即不斷地檢查鎖狀態而占用 CPU 資源。使用互斥鎖和條件變量可以讓線程在等待時被掛起,從而更有效地利用 CPU;

所以解釋器為 OS 互斥鎖引入了一個自定義的鎖狀態 locked,OS 互斥鎖提供了對 locked 的基本保護,因為多個線程都要修改它。而自定義的鎖狀態 locked 則用于實現同步邏輯,如果 locked 為 1,我們就認為鎖被獲取了,locked 為 0,鎖就沒有被獲取。

協程鎖和線程鎖都是如此,所謂的獲取鎖、釋放鎖都是在修改 locked 字段的值。只不過在等待的時候,協程鎖使用的是 Future 對象,而線程鎖使用的是操作系統提供的互斥鎖和條件變量。

所以上面代碼中的 PyMUTEX_LOCK 通過之后,還要檢測 locked 字段是否等于 1,代碼片段如下。

while (mutex->locked) {
            if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
                result = WAIT_FAILED;
                break;
            }
        //...

如果 locked 是 1,說明互斥鎖已經被獲取了,當前線程要進行等待,直到 locked 字段的值為 0。當其它線程釋放鎖時,會將 locked 字段修改為 0,并通過條件變量喚醒當前線程。

該線程醒來后檢測到 locked 為 0,就知道互斥鎖已被釋放,自己可以獲取了,于是再將 locked 字段修改為 1。

說完了線程鎖的獲取,再來看看線程鎖的釋放,所謂釋放,其實就是將 locked 字段修改為 0 而已。

圖片圖片

釋放互斥鎖的邏輯最終會調用如下函數:

圖片圖片

修改 locked 是不安全的,需要加鎖保護。所以 OS 互斥鎖就是為了保護 locked 變量的修改,再配合條件變量實現阻塞等待以及自動喚醒,但從代碼邏輯上講,將 locked 字段設置為 0,才算是真正釋放了鎖。

這部分邏輯稍微有點繞,總之記住一個重點:所謂的鎖,它的核心就是結構體的一個字段,這里是 locked。如果字段的值為 1,表示鎖被獲取了,字段的值為 0,表示鎖沒有被獲取。

  • 而獲取鎖,本質上就是將 locked 字段修改為 1;
  • 而釋放鎖,本質上就是將 locked 字段修改為 0;

當鎖沒有被獲取時,那么線程在獲取鎖和釋放鎖時的邏輯可以簡化為如下:

圖片圖片

但實際情況會有多個線程一起競爭鎖,因此為了保護這個共享字段,以及實現阻塞等待和自動喚醒,解釋器使用了操作系統的互斥鎖和條件變量。

小結

以上我們就剖析了協程鎖、信號量以及線程鎖的實現原理,至于線程里面的信號量,它的原理和協程的信號量是一樣的,只是實現方式不一樣。

圖片圖片

線程的信號量包含了一個初始值 value,但它在實現阻塞等待以及喚醒的時候用的是條件變量,而條件變量的實現依賴于鎖。簡單來說,獲取信號量的時候,self._value 會減 1,釋放信號量的時候,self._value 會加 1。

當 self._value 為 0 時,獲取信號量會陷入阻塞,而當某個線程退出臨界區釋放信號量的時候,會通過條件變量的 notify 機制喚醒阻塞的線程。

關于條件變量,我們以后再分析,有點餓了。

另外進程也有鎖和信號量,這里也先不討論了,有點困了。

責任編輯:武曉燕 來源: 古明地覺的編程教室
相關推薦

2024-10-29 15:23:45

Python線程安全

2025-06-03 00:00:02

Go協程鎖機制

2023-12-05 13:46:09

解密協程線程隊列

2016-11-23 16:08:24

Python處理器分布式系統

2025-04-16 08:50:00

信號量隔離線程池隔離并發控制

2024-07-25 11:53:53

2009-12-08 12:14:43

2020-11-10 15:25:26

SemaphoreLinux翻譯

2010-07-15 15:32:10

Perl線程

2010-04-21 16:25:13

Unix信號量

2010-04-21 16:42:48

Unix信號量

2020-09-22 07:35:06

Linux線程進程

2017-05-11 14:05:25

Consul分布式信號量

2010-03-16 17:52:27

Java多線程信號量

2021-04-13 09:20:15

鴻蒙HarmonyOS應用開發

2010-04-21 16:50:31

Unix信號量

2020-11-05 09:59:24

Linux內核信號量

2023-12-08 07:40:07

并發控制

2025-04-23 11:00:00

Hystrix隔離模式信號量

2025-06-10 02:00:00

Golangmap
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久草视频观看 | 亚洲影视在线 | 日韩视频一区二区 | 久久久久久毛片免费观看 | 国产精品视频免费观看 | 免费在线观看一区二区三区 | 久久婷婷色| 中文字幕免费观看 | 亚洲精品一区国语对白 | 亚洲一级淫片 | 亚洲精品v | 97超在线视频 | 一区二区三区视频在线观看 | 欧美一区二区三区在线观看 | 欧美成视频 | 亚洲成人激情在线观看 | 国产一区不卡在线观看 | 精品日韩一区二区 | 欧美一区不卡 | 作爱视频免费看 | 在线日韩视频 | 在线观看中文字幕dvd播放 | 欧美一区二区 | 日韩欧美中文 | 伊人精品在线视频 | 国产传媒在线播放 | 91视视频在线观看入口直接观看 | 91九色婷婷 | 亚洲人成一区二区三区性色 | 日韩视频国产 | 成人av一区二区三区 | 亚洲视频中文字幕 | 在线āv视频 | 日韩精品区 | 国产免费一级一级 | 欧美精品一区二区三区在线播放 | 成人免费视频网站在线观看 | 美国a级毛片免费视频 | 一区二区三区四区在线视频 | 天堂久久天堂综合色 | 久久99国产精一区二区三区 |