Go并發編程 — I/O聚合優化(動畫講解)
背景提要
在存儲系統中,在確保功能不受損的前提下,盡量的減少讀寫I/O的次數是優化的一個重要方向,也就是聚合I/O的場景。讀寫操作雖然都有聚合I/O的需求,但各自的重點和實現方法卻有所不同。接下來,我們將分別探討讀和寫請求的聚合優化方法。
讀請求的聚合
以讀操作中,緩存優化是一種常見的優化手段。具體做法是將讀取的數據存儲在內存中,并通過一個唯一的Key來索引這些數據。當讀請求來到時,如果該Key在緩存中沒有命中,那么就需要從后端存儲獲取。用戶請求直接穿透到后端存儲,如果并發很大,這可能是一個很大的風險。
例如,對于 Key:“test”,如果緩存中沒有相應的數據,并且突然出現大量并發讀取請求,每個請求都會發現緩存未命中。如果這些請求全部直接訪問后端存儲,可能會給后端存儲帶來巨大壓力。
為了應對這種情況,我們其實可以只允許一個讀請求去后端讀取數據,而其他并發請求則等待這個請求的結果。這就是讀請求聚合的基本原理。
在Go語言中,可以使用singleflight 這類第三方庫完成上述需求。singleflight的設計理念是“單一請求執行”,即針對同一個Key,在多個并發請求中只允許一個請求訪問后端。
01 - 讀請求聚合的使用姿勢
下面是一個使用 singleflight 的示例,展現了如何通過傳入特定的Key和閉包函數來聚合并發請求。
package main
import (
// ...
"golang.org/x/sync/singleflight"
)
func main() {
var g singleflight.Group
var wg sync.WaitGroup
// 模擬多個 goroutine 并發請求相同的資源
for i := 0; i < 5; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
v, err, shared := g.Do("objectkey", func() (interface{}, error) {
fmt.Printf("協程ID:%v 正在執行...\n", idx)
time.Sleep(2 * time.Second)
return "objectvalue", nil
})
if err != nil {
log.Fatalf("err:%v", err)
}
fmt.Printf("協程ID:%v 請求結果: %v, 是否共享結果: %v\n", idx, v, shared)
}(i)
}
wg.Wait()
}
在這個例子中,多個Goroutine并發地請求Key為“objectkey”的資源。通過singleflight,我們確保只有一個Goroutine去執行實際的數據加載操作,而其他請求則等待這個操作的結果。接下來,我們將探討 singleflight 的原理。
02 - singleflight的原理
singleflight 庫提供了一個Group結構體,用于管理不同的請求,意圖在內部實現聚合的效果。定義如下:
type Group struct {
mu sync.Mutex // 互斥鎖,包含下面的映射表
m map[string]*call // 正在執行請求的映射表
}
Group結構的核心就是這個map結構。每個正在執行的請求被封裝在 call 結構中,定義如下:
type call struct {
wg sync.WaitGroup // 用于同步并發的請求
val interface{} // 用于存放執行的結果
err error // 存放執行的結果
dups int // 用于計數聚合的請求
// ...其他字段用于處理特殊情況和提高容錯性
}
Group結構的Do方法實現了聚合去重的核心邏輯,代碼實現如下所示:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
// 用 map 結構,來判斷是否已經有對應 Key 正在執行的請求
if c, ok := g.m[key]; ok {
c.dups++
// 如果有對應 Key 的請求正在執行,那么等待結果即可。
g.mu.Unlock()
c.wg.Wait()
// ...
return c.val, c.err, true
}
// 創建一個代表執行請求的結構,和 Key 關聯起來,存入map中
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn) // 真正執行請求
return c.val, c.err, c.dups > 0
}
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
defer func() {
// ...省略異常處理
c.wg.Done()
}()
func() {
// 真正執行請求
c.val, c.err = fn()
}()
// ...
}
通過上述代碼,singleflight的Group結構體利用map記錄了正在執行的請求,關聯了請求的Key和執行體。當新的請求到來時,先檢查是否有相同Key的正在執行的請求,如果有,則等待起結果,從而避免重復執行相同的請求。
動畫示意圖:
圖片
對于讀操作,singleflight通過這種方式有效地減少了重復工作。然而,對于寫操作,處理邏輯會有所不同,它需要額外的機制來保證數據落盤的時序。
寫請求的聚合
我們先回憶一下寫操作的姿勢。首先通過Write系統調用來寫入數據,默認情況下此時數據可能僅駐留在PageCache中,為了確保數據安全落盤,此時我們需要手動調用一次 Sync 系統調用。
然而,Sync操作的成本相當大,并且它除了數據,還會同步元數據等其他信息到磁盤上。對于性能影響巨大。并且,在機械盤的場景下,串行化的執行Sync是更好的實踐。
因此,我們面臨的一個問題是:如果在不犧牲數據安全性的前提下,能否減少Sync的次數呢?
對于同一個文件的寫操作,合并Sync操作是可行的。
文件的Sync會將當前時刻文件在內存中的全部數據一次性同步到磁盤。無論之前執行過多少次Write調用,一次Sync就能全部刷盤。這正是聚合寫請求以優化性能的關鍵所在。
01 - 寫聚合的原理
假設對同一個文件寫了三次數據,每一次都是Write+Sync的操作。那么在合適的時機,三次Sync調用可以優化成一次。如下圖所示:
圖片
請求 C 的 Sync 操作是在所有請求的 Write 之后才發起的,所以它必定能保證在此之前的所有變更的數據都安全落盤。這就是寫操作聚合的根本原理。
接下來我們來思考兩個問題。
問題一:有童鞋可能會問,讀寫聚合優化感覺有一點相似?那能否用 singleflight 聚合寫操作呢?
例如,當并發調用 Sync 的時候,如果發現有正在執行的Sync,能否共享這次Sync請求呢?
答案是:不可以。使用singleflight來優化寫無法保證數據的安全性。
我們必須要保證的是,Sync操作一定要在Write完成之后發起。只要兩者存在并發的可能性,那么Sync就不能保證攜帶了這次Write操作的數據,也就無法保證安全性。
示意圖:
圖片
還是以上面的圖為例來說明,當請求 B 完成 Write 操作后,看到請求 A 已經發起了 Sync 操作。此時它是無法判斷請求 A 的 Sync 操作是否包含了請求 B 的數據。從圖示我們也很清晰的看到,請求B的 Write 和請求 A 的 Sync 在時間上存在重疊。
因此,當Write完成后,如果發現有一個Sync正在執行,我們不能簡單地復用這個Sync。我們需要啟動一個新的Sync操作。
問題二:那么聚合的時機在哪里呢?
對于讀請求的聚合,其時機相對直觀:一旦發現有針對同一個 Key 的請求,就可以等待這次的結果并復用該結果。但寫請求的聚合時機則不是,它的聚合時機是在等待中遇到“志同道合“的請求。
讓我們通過一個具體例子來說明(注意,以下所有的請求都是針對相同的文件):
- t0 時刻:A 執行了 Write,并嘗試發起Sync,由于此時沒有其他請求在執行,A 便執行真正的Sync操作。
- t1 時刻:B 執行了 Write,發現已經有請求在Sync了(即A),因此進入等待狀態,直到A完成。
- t2 時刻:C 執行了 Write,發現已經有請求在Sync了(即A),因此進入等待狀態,直到A完成。
- t3 時刻:D 執行了 Write,發現已經有請求在Sync了(即A),因此進入等待狀態,直到A完成。
- t4 時刻:A 的Sync操作終于完成。A隨即通知 B、C、D 三位,告知它們可以進行Sync請求了。
- t5 時刻:從B、C、D中選擇一個來執行一次Sync操作。假設B被選中,則C、D請求則等待B完成Sync即可。B發起的Sync操作一定包含了B,C,D三者寫的數據,確保了安全性。
- t6:B 的Sync操作完成,C、D被通知操作已完成。如此一來,B、C、D三者的數據都確保落盤。
正如上述所演示,寫操作的聚合是在等待前一次Sync操作完成期間收集到的請求。本來需要4次Sync操作,現在僅需2次Sync就可以確保數據的安全性。
在高并發的場景下,這種聚合方式的效益尤為顯著。下面,我們將探討這種策略的具體代碼實現。
02 - 寫聚合的代碼實現
實現寫操作聚合的關鍵在于確保數據安全的時序前提下進行聚合。以下是一種典型和實現方式,它是對 sync.Cond 和 sync.Once 的巧妙應用。首先,我們定義一個負責聚合的結構體,如下:
// SyncJob 用于管理一個文件的 Sync 任務
type SyncJob struct {
*sync.Cond // 聚合 Sync 的關鍵
holding int32 // 記錄聚合的個數
lastErr error // 記錄執行 Sync 結果
syncPoint *sync.Once // 確保同一時間只有一個 Sync 執行
syncFunc func(interface{}) error // 實際執行 Sync 的函數
}
// SyncJob 的構建函數
func NewSyncJob(fn func(interface{}) error) *SyncJob {
return &SyncJob{
Cond: sync.NewCond(&sync.Mutex{}),
syncFunc: fn,
syncPoint: &sync.Once{},
}
}
接下來,我們為 SyncJob 定義一個執行聚合的方法,如下:
func (s *SyncJob) Do(job interface{}) error {
s.L.Lock()
if s.holding > 0 {
// 如果有請求在前面,則等待前一次請求完成。
// 等待的過程中,會有"志同道合"之人
s.Wait()
}
// 準備要下發請求了,增加計數
s.holding += 1
syncPoint := s.syncPoint
s.L.Unlock()
// "志同道合"的人一起來到這里,此時已經滿足 Write 和 Sync 的時序關系。
// 使用 sync.Once 確保只有請求者執行同步操作。
syncPoint.Do(func() {
// 執行實際的 Sync 操作
s.lastErr = s.syncFunc(job)
s.L.Lock()
// holding 展示本批次有多少個請求
fmt.Printf("holding:%v\n", s.holding)
// 本次請求執行完成,重置計數器,準備下一輪聚合
s.holding = 0
s.syncPoint = &sync.Once{}
// 喚醒下一批的請求
s.Broadcast()
s.L.Unlock()
})
return s.lastErr
}
在這里,我們使用了一個Go的 sync.Cond 來阻塞和通知等待中的請求,并通過 sync.Once 確保同步操作同一時間、同一批只有一個在執行。
- 其實在這個場景下,從代碼實現來講,sync.Cond 也可以使用 Go 的 Channel 來實現相同的效果,用 Ch← 來阻塞,用 close(Ch) 來通知。效果是一樣的,感興趣的童鞋可以改造試試。
現在讓我們來看看這段代碼的實際運行效果:
func main() {
file, err := os.OpenFile("hello.txt", os.O_RDWR, 0700)
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 初始化 Sync 聚合服務
syncJob := NewSyncJob(func(interface{}) error {
fmt.Printf("do sync...\n")
time.Sleep(time.Second())
return file.Sync()
})
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 執行寫操作 write ...
fmt.Printf("write...\n")
// 觸發 sync 操作
syncJob.Do(file)
}()
}
wg.Wait()
}
通過上述代碼,我們講對文件寫入操作后的 Sync 調用進行有效的聚合。童鞋們可以多次運行程序,觀察其行為。可以通過觀察打印的 holding 字段獲悉每一批聚合的請求是多少個。
思考:從效果來講,上面的代碼無論怎么跑,最少要執行兩次 Sync。你知道是為什么嗎?
動畫示意圖:
圖片
總結
上面介紹了讀寫聚合優化的兩種實現。讀和寫的聚合是有區別的。
- 讀操作,核心是一個 map,只要有相同Key的讀取正在執行,那么等待這份正在執行的請求的結果也是符合預期的。同步等待則用的是 sync.WaitGroup 來實現。
- 寫操作,核心是要先保證數據安全性。它必須保證 Sync 操作在 Write 操作之后。因此當發現有正在執行的Sync操作,那么就等待這次完成,然后必須重新開啟一輪的 Sync 操作,等待的過程也是聚合的時機。我們可以使用 sync.Cond(或者 Channel )來實現阻塞和喚醒,使用 sync.Once 來保證同一時間單個執行。