慢聊Golang協程池Ants實現原理
大家都知道goroutine 是 Go語言中的輕量級線程實現,由 Go 運行時(runtime)管理,Go 程序會智能地將 goroutine 中的任務合理地分配給每個 CPU。創建一個goroutine大小大概在2k左右,可以說非常的節省機器資源。
但是為什么要用池化的方式呢?機器資源總是有限的,如果創建了幾十萬個goroutine,那么就消耗比較大了,在一些需要對并發資源進行控制、提升性能、控制生命周期的場景中,還是需要用到協程池去處理。
今天就介紹在github用Go語言實現的有 11.5k?的 Ants 協程池庫的實現!
圖片
初識Ants
Ants介紹
Go的協程非常輕量,但是在超高并發場景,每個請求創建一個協程也是低效的,一個簡單的思想就是協程池。
Ants實現了一個具有固定容量的goroutine池,管理和回收大量goroutine,允許開發人員限制并發程序中的goroutines數量。
圖片
Github地址:https://github.com/panjf2000/ants
這是在github上的截圖,注意不同版本之間代碼實現會略有差異。
圖片
特性
??Ants具有如下特性:
- ? 自動管理和回收大量goroutine
- ? 定期清除過期的goroutines
- ? 豐富的API:提交任務,獲取運行goroutine的數量,動態調整池的容量,釋放池,重新啟動池
- ? 優雅地處理死機以防止程序崩潰
- ? 高效的內存使用,甚至比Golang中的無限goroutine實現了更高的性能
- ? 非阻塞機制
核心概念
- ? Pool :Ants協程池核心結構
- ? WorkerArray:Pool池中的worker隊列,存放所有的Worker
- ? goWorker:運行任務的實際執行者,它啟動一個 goroutine 來接受任務并執行函數調用
- ? sync.Pool:golang 標準庫下并發安全的對象池,緩存申請用于之后的重用,以減輕GC的壓力
- ? spinLock:基于CAS機制和指數退避算法實現的一種自旋鎖
運行流程圖
Ants運行流程圖如下:
圖片
前置知識
自旋鎖 spinLock
我們先了解下什么是自旋鎖!
加鎖的目的就是保證共享資源在任意時間里,只有一個線程訪問,而自旋鎖加鎖失敗后,線程會忙等待,直到它拿到鎖。
圖片
如果要實現鎖的話需要實現Go 標準庫sync的Locker接口
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
Ants的自旋鎖是基于CAS機制和指數退避算法實現的一種自旋鎖,主要利用了下面幾個關鍵的點:
- ? sync.Locker接口
- ? 指數退避算法
- ? sync. atomic 原子包中的方法了解
- ? runtime.Gosched() 讓當前goroutine讓出CPU時間片
?? Go語言中 sync/atomic包提供了底層的原子級內存操作,可實用CAS 函數(Compare And Swap)
?? 指數退避算法以指數方式重試請求,請求失敗后重試間隔分別是 1、2、4 ...,2的n次方秒增加
我們看下具體實現代碼和添加的注釋:
//實現Locker接口
type spinLock uint32
//最大回退次數
const maxBackoff = 16
// 加鎖
func (sl *spinLock) Lock() {
backoff := 1
//基于CAS機制,嘗試獲取鎖
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
//執行backoff次 cpu讓出時間片次數
for i := 0; i < backoff; i++ {
//使當前goroutine讓出CPU時間片
runtime.Gosched()
}
if backoff < maxBackoff {
//左移后賦值 等于 backoff = backoff << 1
//左移一位就是乘以 2的1次方
backoff <<= 1
}
}
}
//釋放鎖
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
Gosched()使當前goroutine程放棄處理器,以讓其它goroutine運行,它不會掛起當前goroutine,因此當前goroutine未來會恢復執行。
?? backoff <<= 1 這段代碼會有你知道什么意思嗎?
這是Go語言的位運算符 << 表示左移n位就是乘以2的n次方, 而 <<= 表示左移后賦值。
??代碼中 backoff <<= 1 其實就是 backoff = backoff << 1,這是左移一位的結果就是 backoff = backoff * 2^1。
自旋鎖執行 backoff 次讓 cpu 時間片動作,次數分別是 1、2、4 ...,封頂16
Ants自旋鎖邏輯用圖表示如下:
圖片
核心數據結構
這里簡單介紹下三個核心的結構體和屬性:
圖片
Pool結構體
Pool就是協程池的實際結構,在下面代碼中已經標記了注釋。
type Pool struct {
// 協程池容量
capacity int32
// 當前協程池中正在運行的協程數
running int32
// ants 實現的自旋鎖,用于同步并發操作
lock sync.Locker
// 存放一組Worker
workers workerArray
// 協程池狀態 (1-關閉、0-開啟)
state int32
// 并發協調器,用于阻塞模式下,掛起和喚醒等待資源的協程
cond *sync.Cond
// worker 對象池
workerCache sync.Pool
// 等待的協程數量
waiting int32
// 回收協程是否關閉
heartbeatDone int32
// 閉回收協程的控制器函數
stopHeartbeat context.CancelFunc
// 協程池的配置
options *Options
}
這里對幾個配置著重講一下:
workerCache :這是sync.Pool類型,主要作用保存和復用臨時對象,減少內存分配,降低 GC 壓力,在Ants中是為了緩存釋放的 Worker 資源
options:可配置化過期時間、是否支持預分配、最大阻塞數量、panic 處理、日志,這里是通過函數式選項模式進行實現的
goWorker
goWorker 是運行任務的實際執行者,它啟動一個 goroutine 來接受任務并執行函數調用,這個協程是一個長期運行不會被主動回收的。
type goWorker struct {
//goWorker 所屬的協程池
pool *Pool
//接收實際執行任務的管道
task chan func()
//goWorker 回收到協程池的時間
recycleTime time.Time
}
WorkerArray
workerArray 是一個接口( interface),其實現包含 stack 棧版本和 queue 隊列兩種實現。
圖片
它定義了幾個通用和用于回收過期 goWorker 的 api
type workerArray interface {
// worker 列表長度
len() int
// 是否為空
isEmpty() bool
// 插入一個goworker
insert(worker *goWorker) error
// 從WorkerArray獲取可用的goworker
detach() *goWorker
// 清理pool.workers中的過期goworker
retrieveExpiry(duration time.Duration) []*goWorker
// 重置,清空WorkerArray中所有的goWorker
reset()
}
核心方法
這是核心實現代碼的走讀部分,基本上都有進行了注釋,看起來可能會有點不怎么理解,多看兩遍就好,相信我 ????!
創建Pool
創建Pool其實就是New一個Pool實例,對Pool中結構體的屬性進行初始化、加載一些配置,這種方式很常見,大家可以注意觀察積累。
圖片
代碼實現和注釋如下:
func NewPool(size int, options ...Option) (*Pool, error) {
//讀取一些自定義的配置
opts := loadOptions(options...)
...
// 創建 Pool 對象
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
// 指定 sync.Pool 創建 worker 的方法
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
// 初始化Pool時是否進行內存預分配
// 區分workerArray 的實現方式
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
// 預先分配固定 Size 的池子
p.workers = newWorkerArray(loopQueueType, size)
} else {
// 初始化不創建,運行時再創建
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
// 開啟一個goroutine清理過期的 worker
go p.purgePeriodically()
return p, nil
}
workerChanCap:確定工作程序的通道是否應為緩沖通道,當獲取給GOMAXPROCS設置的值等于1時表示單核執行,此時的通道是無緩沖通道,否則是有緩沖通道,且容量是1。
這里講的是默認未進行預分配,采用 workerStack 棧實現workerArray的初始化。
清理過期goWorker
在初始化好Pool結構屬性后,會開啟一個goroutine清理過期的 worker。
??怎么判定goroutine是過期的?
Ants過期的定義是:每個 goWorker的 recycleTime 加上用戶配置的過期時間 Pool.options.ExpiryDuration 小于 time.Now() 時即認為該協程已過期。
我們看下具體流程
func (p *Pool) purgePeriodically(ctx context.Context) {
// ExpiryDuration 默認是1s
heartbeat := time.NewTicker(p.options.ExpiryDuration)
...
for {
select {
case <-heartbeat.C:
case <-ctx.Done():
return
}
// pool關閉
if p.IsClosed() {
break
}
// 從 workers 中獲取過期的 worker
p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock()
// 清理過期的worker
for i := range expiredWorkers {
expiredWorkers[i].task <- nil
expiredWorkers[i] = nil
}
// 喚醒所有等待的線程
if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
p.cond.Broadcast()
}
}
}
??清理流程如下:
- 1. 取出過期的goWorker
- 2. 通知 goWorker 退出,方式是向過期 goWorker 的 task channel 發送一個 nil
- 3. 接收值為 nil 的任務后goWorker會退出
- 4. 所有工作程序都已清理完畢,可能這時還有 goroutine 阻塞在cond.Wait上,會調用 p.cond.Broadcast() 喚醒這些 goroutine
Submit任務提交
在初始化完成Pool之后,就需要往池中提交帶執行任務了,Pool提供了 Submit 方法,提供外部發起提交任務的接口。
func (p *Pool) Submit(task func()) error {
// pool是否關閉
if p.IsClosed() {
return ErrPoolClosed
}
var w *goWorker
// 嘗試獲取空閑的goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
// 發送到 goWorker的channel中
w.task <- task
return nil
}
獲取可用goWork
Submit方法內部調用 pool.retrieveWorker 方法并嘗試獲取一個空閑的 goWorker,如果獲取成功會將任務發送到goWorker的channel類型task中。
func (p *Pool) retrieveWorker() (w *goWorker) {
//創建一個新的goWorker,并執行
spawnWorker := func() {
//實例化 worker
w = p.workerCache.Get().(*goWorker)
// 運行
w.run()
}
// 加鎖
p.lock.Lock()
// 從workers 中取出一個 goWorker
// workerStack 實現了p.workers的方法
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
// Pool容量大于正在工作的 goWorker 數量)
//則調用 spawnWorker() 新建一個 goWorker
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
spawnWorker()
} else {
// options設置了非阻塞選項,直接返回 nil
if p.options.Nonblocking {
p.lock.Unlock()
return
}
retry:
//option設置了最大阻塞隊列,當前阻塞等待的任務數量已經達設置上限,直接返回 nil
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
...
var nw int
//如果正在執行的worker數量為0時,則重新創建woker
if nw = p.Running(); nw == 0 {
p.lock.Unlock()
spawnWorker()
return
}
//p.workers中獲取可用的worker
//執行開頭創建的spawnWorker
if w = p.workers.detach(); w == nil {
if nw < p.Cap() {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
p.lock.Unlock()
}
return
}
??看完注釋后理一理retrieveWorker的執行邏輯:
- 1. 聲明一個spawnWorker,從對象池 workerCache 中獲取 goWorker
- 2. 嘗試從 workers 中取出可用的 goWorker
- 3. 如未達到協程池的容量限制,獲取并啟動 spawnWorker(goWorker)
- 4. 如何用戶設置了非阻塞選項,直接返回空的goWorker
- 5. 如果正在執行的goWorker 的數量等于0,調用 spawnWorker()
- 6. 未獲取到goWorker,并且Pool容量未滿,同樣調用 spawnWorker()
?? spawnWorker() 是一個創建和運行goWorker的函數,為后面獲取不到goWorker時先進行預創建goWorker
任務執行
任務執行就是開啟了一個協程,然后執行goWorker中channel的任務task。
func (w *goWorker) run() {
// pool的running 加 一
w.pool.addRunning(1)
go func() {
defer func() {
...
if p := recover(); p != nil {
//處理捕獲的panic
}
w.pool.cond.Signal()
}()
//任務執行
for f := range w.task {
if f == nil {
return
}
f()
//執行完后回收worker
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}
goWorker放回pool
我們知道實際用戶的任務是綁定在goWorker上的, 在執行完任務之后Ants,會將該goWorker放回到workers結構的items數組中(協程池)。
func (p *Pool) revertWorker(worker *goWorker) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
p.cond.Broadcast()
return false
}
// 重置空閑計時器,用于判定過期
worker.recycleTime = p.nowTime()
p.lock.Lock()
...
// 調用works的insert方法放回Pool
err := p.workers.insert(worker)
if err != nil {
p.lock.Unlock()
return false
}
// p.cond.Signal() 喚醒一個可能等待的線程
p.cond.Signal()
p.lock.Unlock()
return true
}