成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

聊聊 Go 流水線編程模式

開發 后端
流水線工作模型在工業領域內十分常見,它將工作流程分為多個環節,每個環節根據工作強度安排合適的人員數量。良好的流水線設計盡量讓各環節的流通率平衡,最大化提高產能效率。

[[433764]]

文末本文轉載自微信公眾號「Golang技術分享」,作者機器鈴砍菜刀 。轉載本文請聯系Golang技術分享公眾號。

流水線工作模型在工業領域內十分常見,它將工作流程分為多個環節,每個環節根據工作強度安排合適的人員數量。良好的流水線設計盡量讓各環節的流通率平衡,最大化提高產能效率。

Go 是一門實用性語言,流水線工作模型與 Go 融合地非常融洽,只不過我們一般使用另一個名詞來表示流水線:pipeline。

pipeline

pipeline 由多個環節組成,具體在 Go 中,環節之間通過 channel 通信,同一個環節任務可以由多個 goroutine 來同時處理。

pipeline

pipeline 的核心是數據,通過 channel 來保證數據流動,每個環節的數據處理由 goroutine 完成。

除了開始環節和結束環節,每個環節都有任意數量的輸入 channel 和輸出 channel。開始環節被稱為發送者或生產者,結束環節被稱為接收者或消費者。

下面我們來看一個簡單的 pipeline 例子,分為三個環節。

第一個環節,generate 函數:它充當生產者角色,將數據寫入 channel,并把該 channel 返回。當所有數據寫入完畢,關閉 channel。

  1. func generate(nums ...int) <-chan int { 
  2.  out := make(chan int
  3.  go func() { 
  4.   for _, n := range nums { 
  5.    out <- n 
  6.   } 
  7.   close(out
  8.  }() 
  9.  return out 

第二個環節,square 函數:它是數據處理的角色,從開始環節中的 channel 取出數據,計算平方,將結果寫入新的 channel ,并把該新的 channel 返回。當所有數據計算完畢,關閉該新 channel。

  1. func square(in <-chan int) <-chan int { 
  2.  out := make(chan int
  3.  go func() { 
  4.   for n := range in { 
  5.    out <- n * n 
  6.   } 
  7.   close(out
  8.  }() 
  9.  return out 

main 函數負責編排整個 pipeline ,并充當消費者角色:讀取第二個環節的 channel 數據,打印出來。

  1. func main() { 
  2.  // Set up the pipeline. 
  3.  c := generate(2, 3) 
  4.  out := square(c) 
  5.  
  6.  // Consume the output
  7.  for n := range out { 
  8.   fmt.Println(n) 
  9.  } 

Fan-out,fan-in

在上述例子中,環節之間通過非緩沖的 channel 傳遞數據,節點中的數據都是單個 goroutine 處理與消費。

這種工作模式并不高效,會讓整個流水線的效率取決于最慢的環節。因為每個環節中的任務量是不同的,這意味著我們需要的機器資源是存在差異的。任務量小的環節,盡量占有少量的機器資源,任務量重的環節,需要更多線程并行處理。

以汽車組裝為例,我們可以將組裝輪胎的工作分發給 4 個人一起干,當輪胎組裝完畢之后,再交由剩下的環節。

多個 goroutine 可以從同一個 channel 讀取數據,直到該通道關閉,這稱為 fan-out(扇出)。

這個稱呼比較形象,它將數據進行分散,所以被稱為扇出。扇出是一種分發任務的模式。

fan-out

單個 goroutine 可以從多個輸入 channel 中讀取數據,直到所有輸入都關閉。具體做法是將輸入 channel 多路復用到同一個 channel 上,當所有輸入 channel 都關閉時,該 channel 也關閉,這稱為 fan-in(扇入)。

它將數據進行聚合,所以被稱為扇入。扇入是一種整合任務結果的模式。

fan-in

在汽車組裝的例子中,分發輪胎任務給每個人是 Fan-out,合并輪胎組裝結果就是 Fan-in。

channel 的多路復用

扇出的編碼模型比較簡單,本文不多研究,我們提供一個扇入編程示例。

創建一個生成器函數 generate,通過 interval 參數控制消息生成頻率。生成器返回消息 channel mc與停止 channel sc,停止 channel 用于停止生成器任務。

  1. func generate(message string, interval time.Duration) (chan string, chan struct{}) { 
  2.  mc := make(chan string) 
  3.  sc := make(chan struct{}) 
  4.  
  5.  go func() { 
  6.   defer func() { 
  7.    close(sc) 
  8.   }() 
  9.  
  10.   for { 
  11.    select { 
  12.    case <-sc: 
  13.     return 
  14.    default
  15.     time.Sleep(interval) 
  16.  
  17.     mc <- message 
  18.    } 
  19.   } 
  20.  }() 
  21.  
  22.  return mc, sc 

stopGenerating 函數通過通過向 sc 中傳入空結構體,通知 generate退出,調用 close(mc) 關閉消息 channel

  1. func stopGenerating(mc chan string, sc chan struct{}) { 
  2.  sc <- struct{}{} 
  3.  
  4.  close(mc) 

多路復用函數 multiplex 創建并返回整合消息 channel 和控制并發的 wg。

  1. func multiplex(mcs ...chan string) (chan string, *sync.WaitGroup) { 
  2.  mmc := make(chan string) 
  3.  wg := &sync.WaitGroup{} 
  4.  
  5.  for _, mc := range mcs { 
  6.   wg.Add(1) 
  7.  
  8.   go func(mc chan string, wg *sync.WaitGroup) { 
  9.    defer wg.Done() 
  10.  
  11.    for m := range mc { 
  12.     mmc <- m 
  13.    } 
  14.   }(mc, wg) 
  15.  } 
  16.  
  17.  return mmc, wg 

在 main 函數中,創建兩個消息 channel 并復用它們生成 mmc ,打印來自 mmc 的每條消息。另外,我們還實現了接收系統斷信號(終端上執行 CTRL+C 即可發送中斷信號)的優雅的關閉機制。

  1. func main() { 
  2.  // create two sample message and stop channels 
  3.  mc1, sc1 := generate("message from generator 1", 200*time.Millisecond) 
  4.  mc2, sc2 := generate("message from generator 2", 300*time.Millisecond) 
  5.  
  6.  // multiplex message channels 
  7.  mmc, wg1 := multiplex(mc1, mc2) 
  8.  
  9.  // create errs channel for graceful shutdown 
  10.  errs := make(chan error) 
  11.  
  12.  // wait for interrupt or terminate signal 
  13.  go func() { 
  14.   sc := make(chan os.Signal, 1) 
  15.   signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM) 
  16.   errs <- fmt.Errorf("%s signal received", <-sc) 
  17.  }() 
  18.  
  19.  // wait for multiplexed messages 
  20.  wg2 := &sync.WaitGroup{} 
  21.  wg2.Add(1) 
  22.  go func() { 
  23.   defer wg2.Done() 
  24.  
  25.   for m := range mmc { 
  26.    fmt.Println(m) 
  27.   } 
  28.  }() 
  29.  
  30.  // wait for errors 
  31.  if err := <-errs; err != nil { 
  32.   fmt.Println(err.Error()) 
  33.  } 
  34.  
  35.  // stop generators 
  36.  stopGenerating(mc1, sc1) 
  37.  stopGenerating(mc2, sc2) 
  38.  wg1.Wait() 
  39.  
  40.  // close multiplexed messages channel 
  41.  close(mmc) 
  42.  wg2.Wait() 

總結

本文簡單介紹了流水線編程模式,它和我們熟悉的生產者-消費者模式非常相似。

具體到 Go 編程實踐中,pipeline 將數據流分為多個環節,channel 用于數據流動,goroutine 用于處理數據。fan-out 用于分發任務,fan-in 用于數據整合,通過 FAN 模式可以讓流水線更好地并發。

當然,還有些細節需要注意,例如停止通知機制,可參照本文 channel 的多路復用章節示例中的 stopGenerating 函數;如何通過 sync.WaitGroup 做好并發控制,這些都是需要讀者在實際編碼中去體會掌握的。

參考

Go Concurrency Patterns: Pipelines and cancellation:https://go.dev/blog/pipelines

 

Multiplexing Channels In Go:https://medium.com/@ermanimer/multiplexing-channels-in-go-a7dccdcc4134

 

責任編輯:武曉燕 來源: Golang技術分享
相關推薦

2023-05-10 15:08:00

Pipeline設計模式

2024-01-07 12:47:35

Golang流水線設計模式

2017-03-02 14:12:13

流水線代碼Clojure

2017-02-28 16:00:45

DevOpsMarkdownreST

2022-07-18 06:05:28

Gitlab流水線

2017-02-28 15:40:30

Docker流水線Azure

2013-06-06 09:31:52

2021-06-26 14:22:34

Tekton流水線Kubernetes

2022-01-26 08:12:42

Jenkins開源流水線

2023-08-18 10:24:52

GitLabCI 流水線

2021-06-28 06:32:46

Tekton Kubernetes Clone

2021-12-24 08:02:48

GitLabCI模板庫流水線優化

2023-09-27 08:24:49

2021-06-18 05:48:02

Tekton DevopsKubernetes

2021-01-05 08:39:51

容器前端流水線

2019-11-07 09:00:39

Jenkins流水線開源

2018-10-23 16:35:19

華為云

2012-04-19 11:44:52

iPhone

2020-06-16 10:20:32

JavaStream流水線

2011-10-19 08:04:12

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费能直接在线观看黄的视频 | 韩国欧洲一级毛片 | 亚洲午夜精品一区二区三区 | 欧美成人一区二区 | 亚洲国产成人在线观看 | 亚洲永久免费 | 成人免费三级电影 | 国产精品中文字幕在线播放 | 久久丝袜 | 一区二区在线不卡 | 精品日本中文字幕 | 欧美日韩成人网 | 国产精品久久久久久久久久尿 | 国际精品鲁一鲁一区二区小说 | 我我色综合 | av一级毛片| aaa级片| 久久国产秒 | 国产精品久久久久久福利一牛影视 | 亚洲欧美日韩一区二区 | 99久久精品免费看国产小宝寻花 | a级大片免费观看 | 亚洲视频在线观看一区二区三区 | 99久久精品免费看国产高清 | 久久夜视频 | 国产成人久久精品 | 欧美亚洲视频 | 日韩精品专区在线影院重磅 | 浴室洗澡偷拍一区二区 | 日韩av一区二区在线观看 | 国产日韩欧美一区 | 一区二区三区日本 | 亚洲欧美日韩电影 | 91亚洲免费 | 欧美成人免费在线视频 | 免费亚洲成人 | 精品无码久久久久久国产 | 久草新视频| 日韩精品一区二区三区在线播放 | 一区二区在线免费观看 | 精品不卡 |