Go 協程鎖機制的實現
在日常的開發中,我們經常會通過高并發來提高系統的處理能力,高并發帶來的一個問題就是對公共資源的訪問,這時候必須使用一些互斥機制保證數據的正確性。比如數據庫的鎖機制,或者我們基于 Redis、Zookeeper 等中間件實現的分布式鎖機制,線程/進程間的鎖機制等,而在 Go 中,我們還會看到有協程的鎖機制。本文主要分析 Go 中協程鎖機制的實現。
鎖的實現方式通常是首先通過 CAS 嘗試獲得鎖,如果成功則直接返回,如果獲取失敗可以選擇自旋或者把當前的實體加入鎖的等待隊列并把當前實體改成阻塞狀態,然后觸發重新調度,等待鎖的持有者解鎖然后喚醒等待者。但是比較有意思的是"把當前的實體加入鎖的等待隊列"這個問題,比如有多個線程需要操作公共資源 A,當 A 被某個線程加鎖成功后,其余的加鎖失敗的多個線程都需要加入鎖 A 的等待隊列 Q1,這里也涉及到公共資源的訪問,所以對 Q1 的訪問也會執行 CAS 加鎖,失敗后加入等待隊列 Q2,形成了套娃。后面我們會看到這個套娃是如何解決的。
協程間的鎖機制
在 Go 中,我們可以通過 Mutex 來實現多個協程對公共資源的訪問,它的使用方式很簡單,但是實現相對來說復雜很多,因為 Go 做了很多性能優化,比如自旋,解鎖時喚醒的公平性。下面看一個例子。
package main
import (
"sync"
)
func main() {
var m sync.Mutex
m.Lock()
// 訪問公共資源
m.Unlock()
}
鎖機制的 API 通常不會很復雜,一般就是 lock/unlock,多一點的就是 trylock 或帶 timeout 的 lock。接著看一下這簡單使用方式的背后 Mutex 是如何實現的。Mutex 的定義如下。
type Mutex struct {
mu isync.Mutex
}
func (m *Mutex) Lock() {
m.mu.Lock()
}
可以看到,Mutex 是對 isync.Mutex 的封裝,接著看 isync.Mutex Lock 的實現。
type Mutex struct {
state int32
sema uint32
}
func (m *Mutex) Lock() {
// 獲取鎖,成功就直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}
Lock 首先用 CAS 獲得鎖,成功則直接返回,失敗時,Go 在獲得鎖失敗時會做一系列的優化處理,我們只關注阻塞協程的部分。
func (m *Mutex) lockSlow() {
iter := 0
old := m.state
for {
// 鎖被其他協程持有 & 不是饑餓模式 & 可以自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋
runtime_doSpin()
iter++
old = m.state
continue
}
// 嘗試獲得鎖
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 成功
if old&(mutexLocked|mutexStarving) == 0 {
break// locked the mutex with CAS
}
// 獲取失敗則阻塞
runtime_SemacquireMutex(&m.sema, queueLifo, 2)
iter = 0
}
}
}
Go 在獲得鎖失敗時會先嘗試自旋,如果還是失敗則調用 runtime_SemacquireMutex(&m.sema, queueLifo, 2) 把協程加入等待隊列并阻塞協程。runtime_SemacquireMutex 對應函數如下。
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
gp := getg()
// 獲取一個 sudog
s := acquireSudog()
// semtable 是一個數組,每個元素對應一棵樹,根據 addr 計算哈希出所屬的數組索引,并拿到該索引對應的樹的根節點
root := semtable.rootFor(addr)
// 對樹進行加鎖,因為需要把 sodog 插入樹中
lockWithRank(&root.lock, lockRankRoot)
// 把 sudog 插入樹中,sudog 記錄了 addr 和當前的 g
root.queue(addr, s, lifo)
// 把協程改成阻塞狀態,調度其他協程執行
goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
}
semacquire1 的邏輯是首先獲得一個樹的鎖,然后把當前 addr 和 g 封裝成 sudog 插入樹中,最終把協程改成阻塞狀態,重新觸發調度。鎖的持有者解鎖后會喚醒鎖的等待者。
func semrelease1(addr *uint32, handoff bool, skipframes int) {
// 根據地址找到樹的根節點
root := semtable.rootFor(addr)
// 加鎖操作樹
lockWithRank(&root.lock, lockRankRoot)
// 從樹中獲取 addr 對應的 sudog
s, t0, tailtime := root.dequeue(addr)
unlock(&root.lock)
// 修改協程為 ready 狀態,等待調度執行
readyWithTime(s, 5+skipframes)
}
通過前面的分析可以看到 Go Mutex 的實現并不是直接使用多線程的鎖機制,而是 Go 自己實現的,因為直接使用多線程的鎖會導致一個協程阻塞引起整個線程阻塞。Go Mutex 的實現原理大致是首先通過 CAS 獲取鎖,成功則返回,失敗則獲取一個全局數據結構(樹)的鎖,把當前 g 插入樹中等待喚醒。那么這個全局數據結構的鎖是如何獲取的呢?這就涉及到線程間的鎖機制了。
線程間的鎖機制
剛才加鎖全局數據結構的函數是 lockWithRank。
func lockWithRank(l *mutex, rank lockRank) {
lock2(l)
}
func lock2(l *mutex) {
gp := getg()
k8 := key8(&l.key)
// 直接加鎖成功
v8 := atomic.Xchg8(k8, mutexLocked)
if v8&mutexLocked == 0 {
if v8&mutexSleeping != 0 {
atomic.Or8(k8, mutexSleeping)
}
return
}
// 創建一個線程級的加鎖數據結構,比如線程互斥結構體
semacreate(gp.m)
v := atomic.Loaduintptr(&l.key)
tryAcquire:
for i := 0; ; i++ {
// 判斷加鎖可以加鎖并且加鎖成功
if v&mutexLocked == 0 {
prev8 := atomic.Xchg8(k8, mutexLocked|mutexSleeping)
if prev8&mutexLocked == 0 {
timer.end()
return
}
v = atomic.Loaduintptr(&l.key)
continue tryAcquire
}
// 阻塞
semasleep(-1)
v = atomic.Loaduintptr(&l.key)
}
}
可以看到 lockWithRank 最終類似協程加鎖的流程,先嘗試加鎖,成功則返回,失敗則進入阻塞流程,但是這個阻塞流程和協程的阻塞流程是不一樣的。看一下 semasleep 的實現,semasleep 在不同系統下實現不一樣,比如 MacOS 下是:
func semasleep(ns int64) int32 {
g := getg()
mp := g.m
pthread_mutex_lock(&mp.mutex)
for {
// 已經解鎖了,則返回
if mp.count > 0 {
mp.count--
pthread_mutex_unlock(&mp.mutex)
return0
}
// 阻塞等待條件變量的喚醒
pthread_cond_wait(&mp.cond, &mp.mutex)
}
}
Linux 下是:
func semasleep(ns int64) int32 {
mp := getg().m
for v := atomic.Xadd(&mp.waitsema, -1); ; v = atomic.Load(&mp.waitsema) {
ifint32(v) >= 0 {
return0
}
futexsleep(&mp.waitsema, v, ns)
if ns >= 0 {
ifint32(v) >= 0 {
return0
} else {
return-1
}
}
}
}
func futexsleep(addr *uint32, val uint32, ns int64) {
if ns < 0 {
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
return
}
var ts timespec
ts.setNsec(ns)
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}
MacOS 是通過 C 庫函數實現,Linux 下則是通過系統調用實現的。那么 C 庫和 Futex 是如何實現線程的鎖機制的呢?它們的實現有什么區別?
暫時沒有找到 MacOS 下的 C 庫實現,下面是早期 linuxthreads 庫實現的線程鎖機制。
int __pthread_mutex_lock(pthread_mutex_t * mutex)
{
pthread_t self;
while(1) {
// 加鎖, acquire 實現是 while (testandset(spinlock)) sched_yield();
acquire(&mutex->m_spinlock);
switch(mutex->m_kind) {
case PTHREAD_MUTEX_FAST_NP:
// 如果還沒有加鎖則加鎖直接返回
if (mutex->m_count == 0) {
mutex->m_count = 1;
release(&mutex->m_spinlock);
return0;
}
self = thread_self();
break;
default:
return EINVAL;
}
// 獲取失敗,需要阻塞,把當前線程插入該互斥鎖的等待隊列
enqueue(&mutex->m_waiting, self);
release(&mutex->m_spinlock);
// 掛起等待喚醒
suspend(self);
}
}
linuxthreads 的實現是首先通過自旋鎖獲取一個鎖,這個鎖是用于互斥訪問 mutex 數據結構,對 mutex 加鎖成功則返回,失敗則把當前線程加入 mutex 的等待隊列,然后再通過 suspend 阻塞在 SIGUSR1 信號,借助操作系統掛起當前線程,然后解鎖時發送 SIGUSR1 信號喚醒等待線程。
int __pthread_mutex_unlock(pthread_mutex_t * mutex)
{
pthread_t th;
// 獲取 mutex 的鎖
acquire(&mutex->m_spinlock);
switch (mutex->m_kind) {
case PTHREAD_MUTEX_FAST_NP:
mutex->m_count = 0;
break;
default:
return EINVAL;
}
// 取出一個被阻塞的線程
th = dequeue(&mutex->m_waiting);
release(&mutex->m_spinlock);
// 發送信號喚醒它
if (th != NULL) restart(th);
return0;
}
可以看到 linuxthreads 庫是通過自旋鎖解決了套娃的問題。而 glibc 的線程鎖機制則是借助 futex 實現的。
int pthread_mutex_lock (pthread_mutex_t *mutex)
{
// 獲取鎖類型
unsignedint type = PTHREAD_MUTEX_TYPE_ELISION (mutex);
// 普通鎖
if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
{
LLL_MUTEX_LOCK_OPTIMIZED (mutex);
}
// 加鎖成功,記錄鎖的持有者
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
mutex->__data.__owner = id;
return0;
}
pthread_mutex_lock 的核心邏輯是通過 LLL_MUTEX_LOCK_OPTIMIZED 實現的。
# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex)
static inline void lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{
intprivate = PTHREAD_MUTEX_PSHARED (mutex);
lll_lock (mutex->__data.__lock, private);
}
#define lll_lock(futex, private) \
__lll_lock (&(futex), private)
#define __lll_lock(futex, private) \
((void) \
({ \
int *__futex = (futex); \
if (__glibc_unlikely \
(atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \
{ \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__lll_lock_wait_private (__futex); \
else \
__lll_lock_wait (__futex, private); \
} \
}))
void __lll_lock_wait (int *futex, intprivate)
{
if (atomic_load_relaxed (futex) == 2)
goto futex;
while (atomic_exchange_acquire (futex, 2) != 0)
{
futex:
futex_wait ((unsignedint *) futex, 2, private); /* Wait if *futex == 2. */
}
}
static inline int
futex_wait (int *futexp, int val)
{
#ifdef __NR_futex
return syscall (__NR_futex, futexp, FUTEX_WAIT, val);
#else
return syscall (__NR_futex_time64, futexp, FUTEX_WAIT, val);
#endif
}
pthread_mutex_lock 通過原子操作進行加鎖,成功則返回,失敗則通過 futex 實現阻塞和喚醒。
Futex
Futex 是通過系統實現線程阻塞喚醒的一種方式,基于 futex 實現的鎖邏輯為首先通過 CAS 加鎖,成功后直接返回,失敗則通過 futex 陷入系統把當前線程改成阻塞狀態,然后鎖的持有者解鎖時再通過 futex 喚醒等待者。2.6.11 版本的 Futex 實現如下。
static int futex_wait(unsigned long uaddr, int val, unsigned long time)
{
// 等待隊列的節點
wait_queue_t wait = {
.task = current, // 當前線程
.func = default_wake_function,
.task_list = { NULL, NULL }
}
int ret, curval;
struct futex_q q;
retry:
// 加鎖內存映射區域
down_read(¤t->mm->mmap_sem);
// 根據 uaddr 計算 key
ret = get_futex_key(uaddr, &q.key);
// 插入隊列
queue_me(&q, -1, NULL);
up_read(¤t->mm->mmap_sem);
// 把當前線程改成阻塞狀態
__set_current_state(TASK_INTERRUPTIBLE);
// 把當前線程插入 futex 結構體的等待隊列
add_wait_queue(&q.waiters, &wait);
if (likely(!list_empty(&q.list)))
time = schedule_timeout(time);
__set_current_state(TASK_RUNNING);
}
futex_wait 首先把 futex 結構體插入到隊列中,然后把當前線程插入到 futex 結構體的隊列中,最后把當前線程改成阻塞狀態并重新進行調度。那么把 futex 結構體插入到隊列中又必然會涉及到并發訪問公共資源的問題,看看 queue_me 是如何解決的。
static void queue_me(struct futex_q *q, int fd, struct file *filp)
{
struct futex_hash_bucket *bh;
q->fd = fd;
q->filp = filp;
init_waitqueue_head(&q->waiters);
get_key_refs(&q->key);
bh = hash_futex(&q->key);
q->lock_ptr = &bh->lock;
spin_lock(&bh->lock);
bh->nqueued++;
list_add_tail(&q->list, &bh->chain);
spin_unlock(&bh->lock);
}
queue_me 先通過 key 從一個全局的結構體數組中計算出對應的索引,這個索引對應的元素是一條鏈表(減少并發時的競爭),然后加鎖并把 futex 結構體插入該鏈表中。這里使用的是自旋鎖,那么這里的鎖又是怎么加的呢?
#define spin_lock(lock) _spin_lock(lock)
#define _spin_lock(lock) \
do { \
preempt_disable(); \ // 關系統搶占
_raw_spin_lock(lock); \
__acquire(lock); \
} while(0)
_raw_spin_lock 在不同的架構中實現不一樣。比如非 SMP 架構下因為關閉了系統搶占,所以 _raw_spin_lock 的實現實際上什么都不需要做,因為不會發生并發問題。
#define _raw_spin_lock(lock) do { (void)(lock); } while(0)
而 SMP 架構下,可能存在多個 CPU 訪問該數據結構,所以需要真正的加鎖。
static inline void _raw_spin_lock(spinlock_t *lock)
{
__asm__ __volatile__(
spin_lock_string
:"=m" (lock->slock) : : "memory");
}
#define spin_lock_string \
"\n1:\t" \
"lock ; decb %0\n\t" \
"jns 3f\n" \
"2:\t" \
"rep;nop\n\t" \
"cmpb $0,%0\n\t" \
"jle 2b\n\t" \
"jmp 1b\n" \
"3:\n\t"
以上代碼大概是通過原子操作實現加鎖。
通過分析可以看到,在 Go 中,協程鎖機制的實現是由 Go 自己實現的,但是在實現中需要借助線程的鎖機制來實現協程的阻塞等待和喚醒,而線程的鎖機制又需要通過操作系統的 futex 來實現線程的阻塞等待和喚醒,而系統的等待和喚醒機制同樣需要通過鎖機制來實現,具體實現在不同架構下不一樣,比如在非 SMP 架構中,只需要關系統搶占即可,這樣在執行操作系統代碼時就不會有并發代碼訪問該公共數據結構(需要保證中斷中也不會訪問該數據結構,否則也需要關中斷),而在 SMP 架構下是通過自旋鎖來實現的,這樣就解決了套娃的問題。