成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

慢聊Golang協程池Ants實現原理

開發 前端
我們知道實際用戶的任務是綁定在goWorker上的, 在執行完任務之后Ants,會將該goWorker放回到workers結構的items數組中(協程池)。

大家都知道goroutine 是 Go語言中的輕量級線程實現,由 Go 運行時(runtime)管理,Go 程序會智能地將 goroutine 中的任務合理地分配給每個 CPU。創建一個goroutine大小大概在2k左右,可以說非常的節省機器資源。

但是為什么要用池化的方式呢?機器資源總是有限的,如果創建了幾十萬個goroutine,那么就消耗比較大了,在一些需要對并發資源進行控制、提升性能、控制生命周期的場景中,還是需要用到協程池去處理。

今天就介紹在github用Go語言實現的有 11.5k?的 Ants 協程池庫的實現!

圖片圖片

初識Ants

Ants介紹

Go的協程非常輕量,但是在超高并發場景,每個請求創建一個協程也是低效的,一個簡單的思想就是協程池。

Ants實現了一個具有固定容量的goroutine池,管理和回收大量goroutine,允許開發人員限制并發程序中的goroutines數量。

圖片圖片

Github地址:https://github.com/panjf2000/ants

這是在github上的截圖,注意不同版本之間代碼實現會略有差異。

圖片圖片

特性

??Ants具有如下特性:

  • ? 自動管理和回收大量goroutine
  • ? 定期清除過期的goroutines
  • ? 豐富的API:提交任務,獲取運行goroutine的數量,動態調整池的容量,釋放池,重新啟動池
  • ? 優雅地處理死機以防止程序崩潰
  • ? 高效的內存使用,甚至比Golang中的無限goroutine實現了更高的性能
  • ? 非阻塞機制

核心概念

  • ? Pool :Ants協程池核心結構
  • ? WorkerArray:Pool池中的worker隊列,存放所有的Worker
  • ? goWorker:運行任務的實際執行者,它啟動一個 goroutine 來接受任務并執行函數調用
  • ? sync.Pool:golang 標準庫下并發安全的對象池,緩存申請用于之后的重用,以減輕GC的壓力
  • ? spinLock:基于CAS機制和指數退避算法實現的一種自旋鎖

運行流程圖

Ants運行流程圖如下:

圖片圖片

前置知識

自旋鎖 spinLock

我們先了解下什么是自旋鎖!

加鎖的目的就是保證共享資源在任意時間里,只有一個線程訪問,而自旋鎖加鎖失敗后,線程會忙等待,直到它拿到鎖。

圖片圖片

如果要實現鎖的話需要實現Go 標準庫sync的Locker接口

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
     Lock()
     Unlock()
}

Ants的自旋鎖是基于CAS機制和指數退避算法實現的一種自旋鎖,主要利用了下面幾個關鍵的點:

  • ? sync.Locker接口
  • ? 指數退避算法
  • ? sync. atomic 原子包中的方法了解
  • ? runtime.Gosched() 讓當前goroutine讓出CPU時間片

?? Go語言中 sync/atomic包提供了底層的原子級內存操作,可實用CAS 函數(Compare And Swap)

?? 指數退避算法以指數方式重試請求,請求失敗后重試間隔分別是 1、2、4 ...,2的n次方秒增加

我們看下具體實現代碼和添加的注釋:

//實現Locker接口
type spinLock uint32
//最大回退次數
const maxBackoff = 16
// 加鎖
func (sl *spinLock) Lock() {
    backoff := 1
    //基于CAS機制,嘗試獲取鎖
    for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
        //執行backoff次 cpu讓出時間片次數
        for i := 0; i < backoff; i++ {
            //使當前goroutine讓出CPU時間片
            runtime.Gosched()
        }
        if backoff < maxBackoff {
            //左移后賦值 等于 backoff = backoff << 1
            //左移一位就是乘以 2的1次方
            backoff <<= 1
        }
    }
}

//釋放鎖
func (sl *spinLock) Unlock() {
    atomic.StoreUint32((*uint32)(sl), 0)
}

Gosched()使當前goroutine程放棄處理器,以讓其它goroutine運行,它不會掛起當前goroutine,因此當前goroutine未來會恢復執行。

?? backoff <<= 1 這段代碼會有你知道什么意思嗎?

這是Go語言的位運算符 << 表示左移n位就是乘以2的n次方, 而 <<= 表示左移后賦值。

??代碼中 backoff <<= 1 其實就是 backoff = backoff << 1,這是左移一位的結果就是 backoff = backoff * 2^1。

自旋鎖執行 backoff 次讓 cpu 時間片動作,次數分別是 1、2、4 ...,封頂16

Ants自旋鎖邏輯用圖表示如下:

圖片圖片

核心數據結構

這里簡單介紹下三個核心的結構體和屬性:

圖片圖片

Pool結構體

Pool就是協程池的實際結構,在下面代碼中已經標記了注釋。

type Pool struct {
    // 協程池容量 
    capacity int32
    // 當前協程池中正在運行的協程數
    running int32
    // ants 實現的自旋鎖,用于同步并發操作
    lock sync.Locker
    // 存放一組Worker
    workers workerArray
    // 協程池狀態 (1-關閉、0-開啟)
    state int32
    // 并發協調器,用于阻塞模式下,掛起和喚醒等待資源的協程
    cond *sync.Cond
    // worker 對象池
    workerCache sync.Pool
    // 等待的協程數量
    waiting int32
    // 回收協程是否關閉
    heartbeatDone int32
    // 閉回收協程的控制器函數
    stopHeartbeat context.CancelFunc
    // 協程池的配置
    options *Options
}

這里對幾個配置著重講一下:

workerCache :這是sync.Pool類型,主要作用保存和復用臨時對象,減少內存分配,降低 GC 壓力,在Ants中是為了緩存釋放的 Worker 資源

options:可配置化過期時間、是否支持預分配、最大阻塞數量、panic 處理、日志,這里是通過函數式選項模式進行實現的

goWorker

goWorker 是運行任務的實際執行者,它啟動一個 goroutine 來接受任務并執行函數調用,這個協程是一個長期運行不會被主動回收的。

type goWorker struct {
    //goWorker 所屬的協程池
    pool *Pool
    //接收實際執行任務的管道
    task chan func()
    //goWorker 回收到協程池的時間
    recycleTime time.Time
}

WorkerArray

workerArray 是一個接口( interface),其實現包含 stack 棧版本和 queue 隊列兩種實現。

圖片圖片

它定義了幾個通用和用于回收過期 goWorker 的 api

type workerArray interface {
 // worker 列表長度
 len() int 
 // 是否為空
 isEmpty() bool
 // 插入一個goworker
 insert(worker *goWorker) error 
 // 從WorkerArray獲取可用的goworker
 detach() *goWorker 
 // 清理pool.workers中的過期goworker
 retrieveExpiry(duration time.Duration) []*goWorker  
 // 重置,清空WorkerArray中所有的goWorker
 reset() 
}

核心方法

這是核心實現代碼的走讀部分,基本上都有進行了注釋,看起來可能會有點不怎么理解,多看兩遍就好,相信我 ????!

創建Pool

創建Pool其實就是New一個Pool實例,對Pool中結構體的屬性進行初始化、加載一些配置,這種方式很常見,大家可以注意觀察積累。

圖片圖片

代碼實現和注釋如下:

func NewPool(size int, options ...Option) (*Pool, error) {
    //讀取一些自定義的配置
    opts := loadOptions(options...)

    ...
    // 創建 Pool 對象
    p := &Pool{
        capacity: int32(size),
        lock:     internal.NewSpinLock(),
        options:  opts,
    }
     // 指定 sync.Pool 創建 worker 的方法
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }
    // 初始化Pool時是否進行內存預分配
    // 區分workerArray 的實現方式
    if p.options.PreAlloc {
        if size == -1 {
            return nil, ErrInvalidPreAllocSize
        }
        // 預先分配固定 Size 的池子
        p.workers = newWorkerArray(loopQueueType, size)
    } else {
        // 初始化不創建,運行時再創建
        p.workers = newWorkerArray(stackType, 0)
    }

    p.cond = sync.NewCond(p.lock)

    // 開啟一個goroutine清理過期的 worker
    go p.purgePeriodically()

    return p, nil
}

workerChanCap:確定工作程序的通道是否應為緩沖通道,當獲取給GOMAXPROCS設置的值等于1時表示單核執行,此時的通道是無緩沖通道,否則是有緩沖通道,且容量是1。

這里講的是默認未進行預分配,采用 workerStack 棧實現workerArray的初始化。

清理過期goWorker

在初始化好Pool結構屬性后,會開啟一個goroutine清理過期的 worker。

??怎么判定goroutine是過期的?

Ants過期的定義是:每個 goWorker的 recycleTime 加上用戶配置的過期時間 Pool.options.ExpiryDuration 小于 time.Now() 時即認為該協程已過期。

我們看下具體流程

func (p *Pool) purgePeriodically(ctx context.Context) {
    // ExpiryDuration 默認是1s
    heartbeat := time.NewTicker(p.options.ExpiryDuration)
    ...
    for {
        select {
        case <-heartbeat.C:
        case <-ctx.Done():
            return
        }
        // pool關閉
        if p.IsClosed() {
            break
        }
        // 從 workers 中獲取過期的 worker
        p.lock.Lock()
        expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
        p.lock.Unlock()
        // 清理過期的worker
        for i := range expiredWorkers {
            expiredWorkers[i].task <- nil
            expiredWorkers[i] = nil
        }
        // 喚醒所有等待的線程
        if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
            p.cond.Broadcast()
        }
    }
}

??清理流程如下:

  1. 1. 取出過期的goWorker
  2. 2. 通知 goWorker 退出,方式是向過期 goWorker 的 task channel 發送一個 nil
  3. 3. 接收值為 nil 的任務后goWorker會退出
  4. 4. 所有工作程序都已清理完畢,可能這時還有 goroutine 阻塞在cond.Wait上,會調用 p.cond.Broadcast() 喚醒這些 goroutine

Submit任務提交

在初始化完成Pool之后,就需要往池中提交帶執行任務了,Pool提供了 Submit 方法,提供外部發起提交任務的接口。

func (p *Pool) Submit(task func()) error {
    // pool是否關閉
    if p.IsClosed() {
        return ErrPoolClosed
    }
    var w *goWorker
    // 嘗試獲取空閑的goWorker
    if w = p.retrieveWorker(); w == nil {
        return ErrPoolOverload
    }
    // 發送到 goWorker的channel中
    w.task <- task
    return nil
}

獲取可用goWork

Submit方法內部調用 pool.retrieveWorker 方法并嘗試獲取一個空閑的 goWorker,如果獲取成功會將任務發送到goWorker的channel類型task中。

func (p *Pool) retrieveWorker() (w *goWorker) {
    //創建一個新的goWorker,并執行
    spawnWorker := func() {
        //實例化 worker
        w = p.workerCache.Get().(*goWorker)
        // 運行
        w.run()
    }

    // 加鎖
    p.lock.Lock()
    // 從workers 中取出一個 goWorker
    // workerStack 實現了p.workers的方法
    w = p.workers.detach()
    if w != nil { 
        p.lock.Unlock()
    // Pool容量大于正在工作的 goWorker 數量)
    //則調用 spawnWorker() 新建一個 goWorker
    } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
        
        p.lock.Unlock()
        spawnWorker()
    } else { 
          // options設置了非阻塞選項,直接返回 nil
          if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
    retry:
        //option設置了最大阻塞隊列,當前阻塞等待的任務數量已經達設置上限,直接返回 nil
        if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }
        ...
        var nw int
        //如果正在執行的worker數量為0時,則重新創建woker
        if nw = p.Running(); nw == 0 { 
            p.lock.Unlock()
            spawnWorker()
            return
        }
        //p.workers中獲取可用的worker
        //執行開頭創建的spawnWorker
        if w = p.workers.detach(); w == nil {
            if nw < p.Cap() {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }
        p.lock.Unlock()
    }
    return
}

??看完注釋后理一理retrieveWorker的執行邏輯:

  1. 1. 聲明一個spawnWorker,從對象池 workerCache 中獲取 goWorker
  2. 2. 嘗試從 workers 中取出可用的 goWorker
  3. 3. 如未達到協程池的容量限制,獲取并啟動 spawnWorker(goWorker)
  4. 4. 如何用戶設置了非阻塞選項,直接返回空的goWorker
  5. 5. 如果正在執行的goWorker 的數量等于0,調用 spawnWorker()
  6. 6. 未獲取到goWorker,并且Pool容量未滿,同樣調用 spawnWorker()

?? spawnWorker() 是一個創建和運行goWorker的函數,為后面獲取不到goWorker時先進行預創建goWorker

任務執行

任務執行就是開啟了一個協程,然后執行goWorker中channel的任務task。

func (w *goWorker) run() {
    // pool的running 加 一
    w.pool.addRunning(1)
    go func() {
        defer func() {
            ...
            if p := recover(); p != nil {
                //處理捕獲的panic
            }
            w.pool.cond.Signal()
        }()
        //任務執行
        for f := range w.task {
            if f == nil {
                return
            }
            f()
            //執行完后回收worker
            if ok := w.pool.revertWorker(w); !ok {
                return
            }
        }
    }()
}

goWorker放回pool

我們知道實際用戶的任務是綁定在goWorker上的, 在執行完任務之后Ants,會將該goWorker放回到workers結構的items數組中(協程池)。

func (p *Pool) revertWorker(worker *goWorker) bool {
    if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
        p.cond.Broadcast()
        return false
    }
    // 重置空閑計時器,用于判定過期
    worker.recycleTime = p.nowTime()
    p.lock.Lock()
    ...
    // 調用works的insert方法放回Pool
    err := p.workers.insert(worker)
    if err != nil {
        p.lock.Unlock()
        return false
    }
    // p.cond.Signal() 喚醒一個可能等待的線程
    p.cond.Signal()
    p.lock.Unlock()
    return true
}

責任編輯:武曉燕 來源: 小許code
相關推薦

2021-06-08 09:49:01

協程池Golang設計

2023-12-04 07:31:41

Golangwebsocket

2021-05-20 09:14:09

Kotlin協程掛起和恢復

2023-12-05 13:46:09

解密協程線程隊列

2017-05-02 11:38:00

PHP協程實現過程

2024-02-05 09:06:25

Python協程Asyncio庫

2025-02-28 09:04:08

2022-11-21 06:55:08

golang協程

2021-09-16 09:59:13

PythonJavaScript代碼

2021-08-04 16:19:55

AndroidKotin協程Coroutines

2022-10-28 10:45:22

Go協程GoFrame

2025-06-03 00:00:02

Go協程鎖機制

2023-04-19 21:20:49

Tars-Cpp協程

2025-06-10 02:00:00

Golangmap

2024-05-29 08:05:15

Go協程通信

2023-11-17 11:36:59

協程纖程操作系統

2023-11-23 08:31:51

競爭鎖共享字段

2025-06-26 04:10:00

2020-06-11 11:36:49

線程池Java場景

2024-10-18 10:27:50

PHP框架webma
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 美日韩免费视频 | 第一区在线观看免费国语入口 | 国产精品夜色一区二区三区 | 日韩在线看片 | 精品久久一区 | 好好的日在线视频 | 亚洲成人a v | 午夜爱爱网 | 国产一区免费 | 九色国产 | 国产精品久久久久久 | 一区二区视频在线 | 免费久久视频 | 九色网址| 国产亚洲欧美另类一区二区三区 | 亚洲高清在线 | 男人天堂999 | 精品国产一区二区三区成人影院 | 国产一区二区在线视频 | 久久精品国产一区 | 欧美在线一区二区视频 | 亚洲一区二区三区久久久 | 久久久久国产精品人 | 皇色视频在线 | 国产精品福利网站 | 国产精品视频不卡 | 欧美一级欧美三级在线观看 | 欧美日韩第一页 | 四虎影院在线观看免费视频 | 97精品一区二区 | 久久精品色欧美aⅴ一区二区 | se婷婷| 日本精品一区二区三区在线观看视频 | 国产精品视频网站 | 夜夜草视频| 欧美精品在线免费观看 | 午夜小视频在线播放 | 欧美成人免费电影 | 欧美日韩在线免费观看 | 亚洲一区在线观看视频 | 亚洲在线久久 |