成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一個(gè)例子,給你講透典型的Go并發(fā)控制

開發(fā) 前端
本篇從基礎(chǔ)的sync.WaitGroup{}?庫出發(fā),涉及到了并發(fā)安全、sync.Once?等內(nèi)容。最后介紹了并發(fā)控制的利器:golang.org/x/sync/errgroup。

Go中可以使用一個(gè)go關(guān)鍵字讓程序異步執(zhí)行

一個(gè)比較常見的場(chǎng)景:逐個(gè)異步調(diào)用多個(gè)函數(shù),或者循環(huán)中異步調(diào)用

func main() {
 go do1()
 go do2()
 go do3()
}

// 或者

func main() {
 for i := range []int{1,2,3}{
  go do(i)
 }
}

如果了解Go并發(fā)機(jī)制,就知道m(xù)ain在其他goroutine運(yùn)行完成之前就已經(jīng)結(jié)束了,所以上面代碼的運(yùn)行結(jié)果是不符合預(yù)期的。我們需要使用一種叫做并發(fā)控制的手段,來保證程序正確運(yùn)行

為了更容易理解,我們虛擬一個(gè)??

已知有一個(gè)現(xiàn)成的函數(shù)search,能夠按照關(guān)鍵詞執(zhí)行搜索

期望實(shí)現(xiàn)一個(gè)新的函數(shù)coSearch能夠進(jìn)行批量查詢

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 if word == "Go" {
  return "", errors.New("error: Go") // 模擬結(jié)果
 }
 return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}

func coSearch(ctx context.Context, words []string) (results []string, err error) {
 //tbd

 return
}

func main() {
 words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

可以先暫停想想該如何實(shí)現(xiàn)coSearch函數(shù)

并發(fā)控制基礎(chǔ)

sync.WaitGroup是Go標(biāo)準(zhǔn)庫中用來控制并發(fā)的結(jié)構(gòu),這里放一個(gè)使用WaitGroup實(shí)現(xiàn)coSearch的示例

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 if word == "Go" {
  return "", errors.New("error: Go") // 模擬結(jié)果
 }
 return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
}

func coSearch(ctx context.Context, words []string) ([]string, error) {
 var (
  wg      = sync.WaitGroup{}
  once    = sync.Once{}
  results = make([]string, len(words))
  err     error
 )

 for i, word := range words {
  wg.Add(1)

  go func(word string, i int) {
   defer wg.Done()

   result, e := search(ctx, word)
   if e != nil {
    once.Do(func() {
     err = e
    })

    return
   }

   results[i] = result
  }(word, i)
 }

 wg.Wait()

 return results, err
}

func main() {
 words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

上面的代碼中有非常多的細(xì)節(jié),來逐個(gè)聊一聊

?? sync.WaitGroup{}并發(fā)控制

sync.WaitGroup{}的用法非常簡(jiǎn)潔

  • 當(dāng)新運(yùn)行一個(gè)goroutine時(shí),我們需要調(diào)用wg.Add(1)
  • 當(dāng)一個(gè)goroutine運(yùn)行完成的時(shí)候,我們需要調(diào)用wg.Done()
  • wg.Wait()讓程序阻塞在此處,直到所有的goroutine運(yùn)行完畢。

對(duì)于coSearch來說,等待所有g(shù)oroutine運(yùn)行完成,也就完成了函數(shù)的任務(wù),返回最終的結(jié)果

var (
    wg      = sync.WaitGroup{}
    //...省略其他代碼
)

for i, word := range words {
    wg.Add(1)

    go func(word string, i int) {
        defer wg.Done()
  //...省略其他代碼
    }(word, i)
}

wg.Wait()

?? for循環(huán)中的goroutine!

這是一個(gè)Go經(jīng)典錯(cuò)誤,如果goroutine中使用了for迭代的變量,所有g(shù)oroutine都會(huì)獲得最后一次循環(huán)的值。例如下面的示例,并不會(huì)輸出"a", "b", "c" 而是輸出 "c", "c", "c"

func main() {
    done := make(chan bool)

    values := []string{"a", "b", "c"}
    for _, v := range values {
        go func() {
            fmt.Println(v)
            done <- true
        }()
    }

    // wait for all goroutines to complete before exiting
    for _ = range values {
        <-done
    }
}

正確的做法就是像上文示例一樣,將迭代的變量賦值給函數(shù)參數(shù),或者賦值給新的變量

for i, word := range words {
 // ...
    go func(word string, i int) {
        // fmt.Println(word, i)
    }(word, i)
}

for i, word := range words {
    i, word := i, word
    go func() {
        // fmt.Println(word, i)
    }()
}

由于這個(gè)錯(cuò)誤實(shí)在太常見,從Go 1.22開始Go已經(jīng)修正了這個(gè)經(jīng)典的錯(cuò)誤:Fixing For Loops in Go 1.22。

不過Go 1.22默認(rèn)不會(huì)開啟修正,需要設(shè)置環(huán)境變量GOEXPERIMENT=loopvar才會(huì) 開啟

??  并發(fā)安全

簡(jiǎn)單理解:當(dāng)多個(gè)goroutine對(duì)同一個(gè)內(nèi)存區(qū)域進(jìn)行讀寫時(shí),就會(huì)產(chǎn)生并發(fā)安全的問題,它會(huì)導(dǎo)致程序運(yùn)行的結(jié)果不符合預(yù)期

上面的示例把最終的結(jié)果放入了results = make([]string, len(words))中。雖然我們?cè)趃oroutine中并發(fā)的對(duì)于results變量進(jìn)行寫入,但因?yàn)槊恳粋€(gè)goroutine都寫在了獨(dú)立的位置,且沒有任何讀取的操作,因此results[i] = result是并發(fā)安全的

results = [ xxxxxxxx,    xxxxxxxx,    xxxxxxxx,    .... ]
                ^            ^            ^       
                |            |            |       
           goroutine1   goroutine2    goroutine3

這也意味著如果使用results = append(results, result)的方式并發(fā)賦值,因?yàn)闀?huì)涉及到slice的擴(kuò)容等操作,所以并不是并發(fā)安全的,需要利用sync.Mutex{}進(jìn)行加鎖

如果想盡可能的提高程序的并發(fā)性能,推薦使用 results[i] = result這種方式賦值

?? sync.Once{}單次賦值

示例coSearch中,會(huì)返回第一個(gè)出錯(cuò)的search的error。err是一個(gè)全局變量,在并發(fā)goroutine中賦值是并發(fā)不安全的操作

//...省略其他代碼
go func(word string, i int) {
    defer wg.Done()

    result, e := search(ctx, word)
    if e != nil && err == nil {
        err = e

        return
    }

    results[i] = result
}(word, i)
//...省略其他代碼

對(duì)于全局變量的賦值比較常規(guī)做法就是利用sync.Mutex{}進(jìn)行加鎖。但示例的邏輯為單次賦值,我們剛好可以利用同在sync庫的sync.Once{}來簡(jiǎn)化代碼

sync.Once{}功能如其名,將我們要執(zhí)行的邏輯放到它的Do()方法中,無論多少并發(fā)都只會(huì)執(zhí)行一次

//...省略其他代碼
go func(word string, i int) {
    defer wg.Done()

    result, e := search(ctx, word)
    if e != nil {
        once.Do(func() {
            err = e
        })

        return
    }

    results[i] = result
}(word, i)
//...省略其他代碼

Further more

上面的示例coSearch已經(jīng)是一個(gè)比較完善的函數(shù)了,但我們還可以做得更多

?? goroutine數(shù)量控制

coSearch入?yún)⒌臄?shù)組可能非常大,如果不加以控制可能導(dǎo)致我們的服務(wù)器資源耗盡,我們需要控制并發(fā)的數(shù)量

利用帶緩沖channel可以實(shí)現(xiàn)

tokens := make(chan struct{}, 10)

for i, word := range words {
    tokens <- struct{}{} // 新增
    wg.Add(1)

    go func(word string, i int) {
        defer func() {
            wg.Done()
            <-tokens  // 新增
        }()

        result, e := search(ctx, word)
        if e != nil {
            once.Do(func() {
                err = e
            })

            return
        }

        results[i] = result
    }(word, i)
}

wg.Wait()

如上,代碼中創(chuàng)建了10個(gè)緩沖區(qū)的channel,當(dāng)channel被填滿時(shí),繼續(xù)寫入會(huì)被阻塞;當(dāng)goroutine運(yùn)行完成之后,除了原有的wg.Done(),我們需要從channel讀取走一個(gè)數(shù)據(jù),來允許新的goroutine運(yùn)行

通過這種方式,我們控制了coSearch最多只能運(yùn)行10個(gè)goroutine,當(dāng)超過10個(gè)時(shí)需要等待前面運(yùn)行的goroutine結(jié)束

?? context.Context

并發(fā)執(zhí)行的goroutine只要有一個(gè)出錯(cuò),其他goroutine就可以停止,沒有必要繼續(xù)執(zhí)行下去了。如何把取消的事件傳導(dǎo)到其他goroutine呢?context.Context就是用來傳遞類似上下文信息的結(jié)構(gòu)

ctx, cancel := context.WithCancelCause(ctx) // 新增
defer cancel(nil) // 新增

for i, word := range words {
    tokens <- struct{}{}
    wg.Add(1)

    go func(word string, i int) {
        defer func() {
            wg.Done()
            <-tokens
        }()

        result, e := search(ctx, word)
        if e != nil {
            once.Do(func() {
                err = e
                cancel(e) // 新增
            })

            return
        }

        results[i] = result
    }(word, i)
}

wg.Wait()

完整的代碼

最終完成的效果如下

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 select {
 case <-ctx.Done():
  return "", ctx.Err()
 default:
  if word == "Go" || word == "Java" {
   return "", errors.New("Go or Java")
  }
  return fmt.Sprintf("result: %s", word), nil // 模擬結(jié)果
 }
}

func coSearch(ctx context.Context, words []string) ([]string, error) {
 ctx, cancel := context.WithCancelCause(ctx)
 defer cancel(nil)

 var (
  wg   = sync.WaitGroup{}
  once = sync.Once{}

  results = make([]string, len(words))
  tokens  = make(chan struct{}, 2)

  err error
 )

 for i, word := range words {
  tokens <- struct{}{}
  wg.Add(1)

  go func(word string, i int) {
   defer func() {
    wg.Done()
    <-tokens
   }()

   result, e := search(ctx, word)
   if e != nil {
    once.Do(func() {
     err = e
     cancel(e)
    })

    return
   }

   results[i] = result
  }(word, i)
 }

 wg.Wait()

 return results, err
}

并發(fā)控制庫errgroup

可以看到要實(shí)現(xiàn)一個(gè)較為完備的并發(fā)控制,需要做的工作非常多。不過Go官方團(tuán)隊(duì)為大家準(zhǔn)備了 golang.org/x/sync/errgroup

errgroup提供的能力和上文的示例類似,實(shí)現(xiàn)方式也類似,包含并發(fā)控制,錯(cuò)誤傳遞,context.Context傳遞等

package main

import (
 "context"
 "fmt"
 "sync"

 "golang.org/x/sync/errgroup"
)

func coSearch(ctx context.Context, words []string) ([]string, error) {
 g, ctx := errgroup.WithContext(ctx)
 g.SetLimit(10)
 
 results := make([]string, len(words))

 for i, word := range words {
  i, word := i, word

  g.Go(func() error {
   result, err := search(ctx, word)
   if err != nil {
    return err
   }

   results[i] = result
   return nil
  })
 }

 err := g.Wait()

 return results, err
}

errgroup的用法也很簡(jiǎn)單

  • 使用 g, ctx := errgroup.WithContext(ctx)來創(chuàng)建goroutine的管理器
  • g.SetLimit()可以設(shè)置允許的最大的goroutine數(shù)量
  • 類似于go關(guān)鍵詞, g.Go異步執(zhí)行函數(shù)
  • g.Wait()和sync.WaitGroup{}的wg.Wait()類似,會(huì)阻塞直到所有g(shù)oroutine都運(yùn)行完成,并返回其中一個(gè)goroutine的錯(cuò)誤

利用golang.org/x/sync/errgroup大幅簡(jiǎn)化了進(jìn)行并發(fā)控制的邏輯,真是一個(gè)并發(fā)控制的利器啊!

總結(jié)

本篇從基礎(chǔ)的sync.WaitGroup{}庫出發(fā),涉及到了并發(fā)安全、sync.Once等內(nèi)容。最后介紹了并發(fā)控制的利器:golang.org/x/sync/errgroup。

雖然使用Go語言能夠非常簡(jiǎn)單的編寫并發(fā)程序,但其中要注意的細(xì)節(jié)非常多,忽略這些細(xì)節(jié)不僅沒有提升程序運(yùn)行的效率,還會(huì)產(chǎn)生錯(cuò)誤的結(jié)果

責(zé)任編輯:武曉燕 來源: 涼涼的知識(shí)庫
相關(guān)推薦

2021-07-28 08:32:58

Go并發(fā)Select

2024-09-06 12:52:59

2021-04-20 11:40:47

指針類型CPU

2018-07-03 15:20:36

Promise函數(shù)借錢

2025-03-28 08:30:00

PythonPandasaxis

2009-08-10 10:08:45

.NET調(diào)用PHP W

2021-06-24 06:35:00

Go語言進(jìn)程

2009-06-11 14:48:48

jbpm工作流引擎jbpm例子

2009-06-18 15:53:37

Hibernate B

2024-01-25 11:41:00

Python開發(fā)前端

2023-03-14 08:02:14

靜態(tài)路由動(dòng)態(tài)路由設(shè)備

2021-03-24 06:06:13

Go并發(fā)編程Singlefligh

2025-05-28 02:00:00

AI智能體文本

2023-11-06 13:55:59

聯(lián)合索引MySQ

2020-09-06 22:59:35

Linux文件命令

2023-05-25 08:02:09

構(gòu)建工具源碼JS

2021-07-09 06:11:37

Java泛型Object類型

2020-03-26 09:18:54

高薪本質(zhì)因素

2024-06-17 08:40:16

2023-01-30 16:21:24

Linux外觀
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 日本欧美视频 | 亚洲成人中文字幕 | 69av在线视频| 日日噜噜夜夜爽爽狠狠 | www.日韩| 夜夜爽99久久国产综合精品女不卡 | 一级片在线免费播放 | 国产亚洲精品综合一区 | 亚洲精品天堂 | 婷婷五月色综合 | 日韩中文字幕 | 免费小视频在线观看 | 精品国产一区二区在线 | 国产精品永久在线观看 | 成人日韩av | 日韩激情免费 | 中文字幕亚洲欧美 | 台湾a级理论片在线观看 | 1204国产成人精品视频 | 中文字幕精品视频 | 日韩精品久久久久 | 国产精品久久久久久久久久免费看 | 美女天天操 | 午夜欧美a级理论片915影院 | av黄色在线观看 | 国产精品久久久久久久久久久免费看 | 九九免费在线视频 | 精品乱码一区二区三四区 | 午夜a区 | 综合国产第二页 | 日本超碰 | 精品在线播放 | 国产精品久久久久久婷婷天堂 | 国产日韩欧美综合 | 青青久久 | 欧美一级淫片免费视频黄 | 国产欧美一区二区三区在线看蜜臀 | 九九综合 | 97久久精品午夜一区二区 | 2020国产在线 | 日韩乱码av |