一個(gè)例子,給你講透典型的Go并發(fā)控制
Go中可以使用一個(gè)go關(guān)鍵字讓程序異步執(zhí)行
一個(gè)比較常見的場(chǎng)景:逐個(gè)異步調(diào)用多個(gè)函數(shù),或者循環(huán)中異步調(diào)用
func main() {
go do1()
go do2()
go do3()
}
// 或者
func main() {
for i := range []int{1,2,3}{
go do(i)
}
}
如果了解Go并發(fā)機(jī)制,就知道m(xù)ain在其他goroutine運(yùn)行完成之前就已經(jīng)結(jié)束了,所以上面代碼的運(yùn)行結(jié)果是不符合預(yù)期的。我們需要使用一種叫做并發(fā)控制的手段,來保證程序正確運(yùn)行
為了更容易理解,我們虛擬一個(gè)??
已知有一個(gè)現(xiàn)成的函數(shù)search,能夠按照關(guān)鍵詞執(zhí)行搜索
期望實(shí)現(xiàn)一個(gè)新的函數(shù)coSearch能夠進(jìn)行批量查詢
package main
import (
"context"
"errors"
"fmt"
"sync"
)
func search(ctx context.Context, word string) (string, error) {
if word == "Go" {
return "", errors.New("error: Go") // 模擬結(jié)果
}
return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}
func coSearch(ctx context.Context, words []string) (results []string, err error) {
//tbd
return
}
func main() {
words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
results, err := coSearch(context.Background(), words)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(results)
}
可以先暫停想想該如何實(shí)現(xiàn)coSearch函數(shù)
并發(fā)控制基礎(chǔ)
sync.WaitGroup是Go標(biāo)準(zhǔn)庫中用來控制并發(fā)的結(jié)構(gòu),這里放一個(gè)使用WaitGroup實(shí)現(xiàn)coSearch的示例
package main
import (
"context"
"errors"
"fmt"
"sync"
)
func search(ctx context.Context, word string) (string, error) {
if word == "Go" {
return "", errors.New("error: Go") // 模擬結(jié)果
}
return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}
func coSearch(ctx context.Context, words []string) ([]string, error) {
var (
wg = sync.WaitGroup{}
once = sync.Once{}
results = make([]string, len(words))
err error
)
for i, word := range words {
wg.Add(1)
go func(word string, i int) {
defer wg.Done()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
return results, err
}
func main() {
words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
results, err := coSearch(context.Background(), words)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(results)
}
上面的代碼中有非常多的細(xì)節(jié),來逐個(gè)聊一聊
?? sync.WaitGroup{}并發(fā)控制
sync.WaitGroup{}的用法非常簡(jiǎn)潔
- 當(dāng)新運(yùn)行一個(gè)goroutine時(shí),我們需要調(diào)用wg.Add(1)
- 當(dāng)一個(gè)goroutine運(yùn)行完成的時(shí)候,我們需要調(diào)用wg.Done()
- wg.Wait()讓程序阻塞在此處,直到所有的goroutine運(yùn)行完畢。
對(duì)于coSearch來說,等待所有g(shù)oroutine運(yùn)行完成,也就完成了函數(shù)的任務(wù),返回最終的結(jié)果
var (
wg = sync.WaitGroup{}
//...省略其他代碼
)
for i, word := range words {
wg.Add(1)
go func(word string, i int) {
defer wg.Done()
//...省略其他代碼
}(word, i)
}
wg.Wait()
?? for循環(huán)中的goroutine!
這是一個(gè)Go經(jīng)典錯(cuò)誤,如果goroutine中使用了for迭代的變量,所有g(shù)oroutine都會(huì)獲得最后一次循環(huán)的值。例如下面的示例,并不會(huì)輸出"a", "b", "c" 而是輸出 "c", "c", "c"
func main() {
done := make(chan bool)
values := []string{"a", "b", "c"}
for _, v := range values {
go func() {
fmt.Println(v)
done <- true
}()
}
// wait for all goroutines to complete before exiting
for _ = range values {
<-done
}
}
正確的做法就是像上文示例一樣,將迭代的變量賦值給函數(shù)參數(shù),或者賦值給新的變量
for i, word := range words {
// ...
go func(word string, i int) {
// fmt.Println(word, i)
}(word, i)
}
for i, word := range words {
i, word := i, word
go func() {
// fmt.Println(word, i)
}()
}
由于這個(gè)錯(cuò)誤實(shí)在太常見,從Go 1.22開始Go已經(jīng)修正了這個(gè)經(jīng)典的錯(cuò)誤:Fixing For Loops in Go 1.22。
不過Go 1.22默認(rèn)不會(huì)開啟修正,需要設(shè)置環(huán)境變量GOEXPERIMENT=loopvar才會(huì) 開啟
?? 并發(fā)安全
簡(jiǎn)單理解:當(dāng)多個(gè)goroutine對(duì)同一個(gè)內(nèi)存區(qū)域進(jìn)行讀寫時(shí),就會(huì)產(chǎn)生并發(fā)安全的問題,它會(huì)導(dǎo)致程序運(yùn)行的結(jié)果不符合預(yù)期
上面的示例把最終的結(jié)果放入了results = make([]string, len(words))中。雖然我們?cè)趃oroutine中并發(fā)的對(duì)于results變量進(jìn)行寫入,但因?yàn)槊恳粋€(gè)goroutine都寫在了獨(dú)立的位置,且沒有任何讀取的操作,因此results[i] = result是并發(fā)安全的
results = [ xxxxxxxx, xxxxxxxx, xxxxxxxx, .... ]
^ ^ ^
| | |
goroutine1 goroutine2 goroutine3
這也意味著如果使用results = append(results, result)的方式并發(fā)賦值,因?yàn)闀?huì)涉及到slice的擴(kuò)容等操作,所以并不是并發(fā)安全的,需要利用sync.Mutex{}進(jìn)行加鎖
如果想盡可能的提高程序的并發(fā)性能,推薦使用 results[i] = result這種方式賦值
?? sync.Once{}單次賦值
示例coSearch中,會(huì)返回第一個(gè)出錯(cuò)的search的error。err是一個(gè)全局變量,在并發(fā)goroutine中賦值是并發(fā)不安全的操作
//...省略其他代碼
go func(word string, i int) {
defer wg.Done()
result, e := search(ctx, word)
if e != nil && err == nil {
err = e
return
}
results[i] = result
}(word, i)
//...省略其他代碼
對(duì)于全局變量的賦值比較常規(guī)做法就是利用sync.Mutex{}進(jìn)行加鎖。但示例的邏輯為單次賦值,我們剛好可以利用同在sync庫的sync.Once{}來簡(jiǎn)化代碼
sync.Once{}功能如其名,將我們要執(zhí)行的邏輯放到它的Do()方法中,無論多少并發(fā)都只會(huì)執(zhí)行一次
//...省略其他代碼
go func(word string, i int) {
defer wg.Done()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
//...省略其他代碼
Further more
上面的示例coSearch已經(jīng)是一個(gè)比較完善的函數(shù)了,但我們還可以做得更多
?? goroutine數(shù)量控制
coSearch入?yún)⒌臄?shù)組可能非常大,如果不加以控制可能導(dǎo)致我們的服務(wù)器資源耗盡,我們需要控制并發(fā)的數(shù)量
利用帶緩沖channel可以實(shí)現(xiàn)
tokens := make(chan struct{}, 10)
for i, word := range words {
tokens <- struct{}{} // 新增
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens // 新增
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
如上,代碼中創(chuàng)建了10個(gè)緩沖區(qū)的channel,當(dāng)channel被填滿時(shí),繼續(xù)寫入會(huì)被阻塞;當(dāng)goroutine運(yùn)行完成之后,除了原有的wg.Done(),我們需要從channel讀取走一個(gè)數(shù)據(jù),來允許新的goroutine運(yùn)行
通過這種方式,我們控制了coSearch最多只能運(yùn)行10個(gè)goroutine,當(dāng)超過10個(gè)時(shí)需要等待前面運(yùn)行的goroutine結(jié)束
?? context.Context
并發(fā)執(zhí)行的goroutine只要有一個(gè)出錯(cuò),其他goroutine就可以停止,沒有必要繼續(xù)執(zhí)行下去了。如何把取消的事件傳導(dǎo)到其他goroutine呢?context.Context就是用來傳遞類似上下文信息的結(jié)構(gòu)
ctx, cancel := context.WithCancelCause(ctx) // 新增
defer cancel(nil) // 新增
for i, word := range words {
tokens <- struct{}{}
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
cancel(e) // 新增
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
完整的代碼
最終完成的效果如下
package main
import (
"context"
"errors"
"fmt"
"sync"
)
func search(ctx context.Context, word string) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
if word == "Go" || word == "Java" {
return "", errors.New("Go or Java")
}
return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}
}
func coSearch(ctx context.Context, words []string) ([]string, error) {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
var (
wg = sync.WaitGroup{}
once = sync.Once{}
results = make([]string, len(words))
tokens = make(chan struct{}, 2)
err error
)
for i, word := range words {
tokens <- struct{}{}
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
cancel(e)
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
return results, err
}
并發(fā)控制庫errgroup
可以看到要實(shí)現(xiàn)一個(gè)較為完備的并發(fā)控制,需要做的工作非常多。不過Go官方團(tuán)隊(duì)為大家準(zhǔn)備了 golang.org/x/sync/errgroup
errgroup提供的能力和上文的示例類似,實(shí)現(xiàn)方式也類似,包含并發(fā)控制,錯(cuò)誤傳遞,context.Context傳遞等
package main
import (
"context"
"fmt"
"sync"
"golang.org/x/sync/errgroup"
)
func coSearch(ctx context.Context, words []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)
results := make([]string, len(words))
for i, word := range words {
i, word := i, word
g.Go(func() error {
result, err := search(ctx, word)
if err != nil {
return err
}
results[i] = result
return nil
})
}
err := g.Wait()
return results, err
}
errgroup的用法也很簡(jiǎn)單
- 使用 g, ctx := errgroup.WithContext(ctx)來創(chuàng)建goroutine的管理器
- g.SetLimit()可以設(shè)置允許的最大的goroutine數(shù)量
- 類似于go關(guān)鍵詞, g.Go異步執(zhí)行函數(shù)
- g.Wait()和sync.WaitGroup{}的wg.Wait()類似,會(huì)阻塞直到所有g(shù)oroutine都運(yùn)行完成,并返回其中一個(gè)goroutine的錯(cuò)誤
利用golang.org/x/sync/errgroup大幅簡(jiǎn)化了進(jìn)行并發(fā)控制的邏輯,真是一個(gè)并發(fā)控制的利器啊!
總結(jié)
本篇從基礎(chǔ)的sync.WaitGroup{}庫出發(fā),涉及到了并發(fā)安全、sync.Once等內(nèi)容。最后介紹了并發(fā)控制的利器:golang.org/x/sync/errgroup。
雖然使用Go語言能夠非常簡(jiǎn)單的編寫并發(fā)程序,但其中要注意的細(xì)節(jié)非常多,忽略這些細(xì)節(jié)不僅沒有提升程序運(yùn)行的效率,還會(huì)產(chǎn)生錯(cuò)誤的結(jié)果