成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Golang 優雅關閉 gRPC 實踐

開發 后端
本文主要討論了在 Go 語言中實現gRPC服務優雅關閉的技術和方法,從而確保所有連接都得到正確處理,防止數據丟失或損壞。

問題

我在上次做技術支持的時候,遇到了一個有趣的錯誤。我們的服務在 Kubernetes 上運行,有一個容器在重啟時不斷出現以下錯誤信息--"Error bind: address already in use"。對于大多數程序員來說,這是一個非常熟悉的錯誤信息,表明一個進程正試圖綁定到另一個進程正在使用的端口上。

一、背景

我的團隊維護一個 Go 服務,啟動時會在各自的 goroutine 中生成大量不同的 gRPC 服務。

Goroutine[2] - Go 運行時管理的輕量級線程,運行時只需要幾 KB 內存,是 Go 并發性的基礎。

以下是我們服務架構的簡化版本,以及以前啟動和停止服務器時所執行的任務。

package main

type GrpcServerInterface interface{
  Run(stopChan chan <-struct{})
}

type Server struct {
  ServerA GrpcServerIface
  ServerB GrpcServerIface
}

func NewServer() *Server {
  return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(stopChan <-chan struct{}){
  go ServerA.Run(stopChan)
  go ServerB.Run(stopChan)
  <- stopChan
}

func main() {
  stopChan := make(chan struct{})
  server := NewServer()
  server.Start(stopChan)
 
  // Wait for program to terminate and then signal servers to stop
  ch := make(chan os.Signal, 1)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  <-ch
  close(stopChan)
}
package internal

type ServerA struct {
  stopChan <-chan struct{}
}

// Start runs each of the grpc servers
func (s *ServerA) Run(stopChan <-chan struct{}){
  grpcServer := grpc.NewServer()
  
  var listener net.Listener
  ln, err := net.Listen("tcp", ":8080")
  if err != nil {
   // handle error
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     return 
   }
  }
  
  <- stopChan
  grpcServer.Stop() // Gracefully terminate connections and close listener
}

我首先想到這可能是 Docker 或 Kubernetes 運行時的某種偶發性錯誤。這個錯誤讓我覺得很奇怪,原因如下:1.)查看代碼,我們似乎確實在主程序退出時關閉了所有監聽,端口怎么可能在重啟時仍在使用?2.)錯誤信息持續出現了幾個小時,以至于需要人工干預。我原以為在最壞情況下,操作系統會在嘗試重啟容器之前為我們清理資源。或許是清理速度不夠快?

團隊成員建議我們再深入調查一下。

二、解決方案

經過仔細研究,發現我們的代碼實際上存在一些問題...

1. 通道(Channel)與上下文(Context)

通道用于在程序之間發送信號,通常以一對一的方式使用,當一個值被發送到某個通道時,只能從該通道讀取一次。在我們的代碼中,使用的是一對多模式。我們將在 main 中創建的通道傳遞給多個不同的 goroutine,每個 goroutine 都在等待 main 關閉通道,以便知道何時運行清理函數。

從 Go 1.7 開始,上下文被認為是向多個 goroutine 廣播信號的標準方式。雖然這可能不是我們遇到問題的根本原因(我們是在等待通道關閉,而不是試圖讓每個 goroutine 從通道中讀取相同的值),但考慮到這是最佳實踐,還是希望采用這種模式。

以下是從通道切換到上下文后更新的代碼。

package internal

type ServerA struct {}

func (s *ServerA) Run(ctx context.Context){
  grpcServer := grpc.NewServer()
  var listener net.Listener
  ln, err := net.Listen("tcp", ":8080")
  if err != nil {
   log.Fatal("ServerA - Failed to create listener")
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     log.Fatal("ServerA - Failed to start server") 
   }
  }
  
  <- ctx.Done()
  // Clean up logic 
  grpcServer.Stop() // Gracefully terminate connections and close listener
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 stopServer context.CancelFunc
 serverCtx context.Context
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  // create new context from parent context
  s.serverCtx, stopServer := context.WithCancel(ctx) 
  go ServerA.Run(s.serverCtx)
  go ServerB.Run(s.serverCtx)
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}

2. 基于等待組(WaitGroup)的優雅停機

雖然我們通過取消主上下文向 goroutine 發出了退出信號,但并沒有等待它們完成工作。當主程序收到退出信號時,即使我們發送了取消信號,也不能保證它會等待生成的 goroutine 完成工作。因此我們必須明確等待每個 goroutine 完成工作,以避免任何泄漏,為此我們使用了 WaitGroup。

WaitGroup[3] 是一種計數器,用于阻止函數(或者說是 goroutine)的執行,直到其內部計數器變為 0。

package internal

type ServerA struct {}

func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
  wg.Add(1) // Add the current function to the parent's wait group
  defer wg.Done() // Send "done" signal upon function exit
  
  grpcServer := grpc.NewServer()
  var listener net.Listener
  ln, err := net.Listen("tcp", ":8080")
  if err != nil {
   log.Fatal("ServerA - Failed to create listener")
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     log.Fatal("ServerA - Failed to start server") 
   }
  }
  
  <- ctx.Done()
  // Clean up logic 
  grpcServer.Stop() // Gracefully terminate connections and close listener
  fmt.Println("ServerA has stopped")
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 wg sync.WaitGroup
 stopServer context.CancelFunc
 serverCtx context.Context
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  s.serverCtx, stopServer := context.WithCancel(ctx)
  go ServerA.Run(s.serverCtx, &s.wg)
  go ServerB.Run(s.serverCtx, &s.wg)
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
  s.wg.Wait()  // wait for all goroutines to exit before returning
  fmt.Println("Main Server has stopped")
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}

3. 基于通道的啟動信號

在測試過程中,又發現了一個隱藏錯誤。我們未能在接受流量之前等待所有服務端啟動,而這在測試中造成了一些誤報,即流量被發送到服務端,但沒有實際工作。為了向主服務發送所有附屬服務都已準備就緒的信號,我們使用了通道。

package internal

type ServerA struct {
  startChan  
}

func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
  wg.Add(1) // Add the current function to the parent's wait group
  defer wg.Done() // Send "done" signal upon function exit
   
  go func(){
    grpcServer := grpc.NewServer()
    
    var listener net.Listener
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
     log.Fatal("ServerA - Failed to create listener")
    }
    
    for {
     err := grpcServer.Serve(listener)
     if err != nil {
       log.Fatal("ServerA - Failed to start server") 
     }
    }
    close(s.startChan) // Signal that we are done starting server to exit function
    // Wait in the background for mina program to exit
    <- ctx.Done()
    // Clean up logic 
    grpcServer.Stop() // Gracefully terminate connections and close listener
    fmt.Println("ServerA has stopped")
  }()
  <- s.StartChan // Wait for signal before exiting function
  fmt.Println("ServerA has started")
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 wg sync.WaitGroup
 stopServer context.CancelFunc
 serverCtx context.Context
 startChan chan <-struct{}
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
    startChan: make(chan <-struct{}),
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  s.serverCtx, stopServer := context.WithCancel(ctx)
  ServerA.Run(s.serverCtx, &s.wg)
  ServerB.Run(s.serverCtx, &s.wg)
  close(s.startChan)
  <- s.startChan // wait for each server to Start before returning
  fmt.Println("Main Server has started")
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
  s.wg.Wait()  // wait for all goroutines to exit before returning
  fmt.Println("Main Server has stopped")
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}

三、結論

不瞞你說,剛開始學習 Go 時,并發會讓你頭疼不已。調試這個問題讓我有機會看到這些概念的實際用途,并強化了之前不確定的主題,建議你自己嘗試簡單的示例!

參考資料:

  • [1]Go Concurrency — Graceful Shutdown: https:/medium.com/@goldengirlgeeks/go-graceful-shutdown-0c46e67ab9c9
  • [2]Goroutine: https://go.dev/tour/concurrency/1
  • [3]WaitGroup: https://www.geeksforgeeks.org/using-waitgroup-in-golang/amp
責任編輯:趙寧寧 來源: DeepNoMind
相關推薦

2021-09-13 05:02:49

GogRPC語言

2021-09-26 10:20:06

開發Golang代碼

2021-01-19 10:35:49

JVM場景函數

2021-04-20 08:00:31

Redisson關閉訂單支付系統

2021-09-01 23:29:37

Golang語言gRPC

2023-12-05 07:26:21

Golang項目結構

2024-04-02 09:55:36

GolangColly開發者

2021-06-04 10:52:51

kubernetes場景容器

2017-12-19 10:03:44

JavaLinux代碼

2024-04-28 18:24:05

2022-04-11 08:17:07

JVMJava進程

2024-11-13 16:37:00

Java線程池

2022-02-20 23:15:46

gRPCGolang語言

2020-11-23 14:16:42

Golang

2022-04-29 11:52:02

API代碼HTTP

2021-03-28 09:17:18

JVM場景鉤子函數

2024-10-21 15:39:24

2024-05-28 00:00:30

Golang數據庫

2024-01-07 12:47:35

Golang流水線設計模式

2018-12-17 16:44:49

Golang微服務
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 成人在线播放网站 | 一区二区三区小视频 | 中文字幕视频在线观看免费 | 成人综合久久 | 国产精品亚洲一区二区三区在线观看 | 男女羞羞在线观看 | 国产精品久久久久久久久久免费看 | 国产精品视频一区二区三区 | 亚洲成人一区二区三区 | 成人一区二区三区视频 | 欧美一级在线免费 | 最新免费黄色网址 | 国产成人精品久久二区二区91 | 日本免费视频在线观看 | 超碰成人免费观看 | 亚洲精品福利视频 | 国产高清免费 | 狠狠草视频| 欧美爱爱视频 | 91久色 | 91在线一区二区三区 | 国产精品黄色 | 久久久久久久久淑女av国产精品 | 国产一区视频在线 | 日韩欧美高清dvd碟片 | 日本久久久久久 | 国产欧美日韩久久久 | 久草视频网站 | 日本啊v在线| 一区二区三区免费 | 韩日一区二区三区 | 99re热这里只有精品视频 | 亚洲精品性视频 | 国产在线精品一区二区三区 | 国产在线精品一区二区三区 | 在线色网| 97色在线观看免费视频 | 黄色毛片在线播放 | 国产精品久久久久久久午夜片 | 国产一区二区在线播放视频 | 亚洲免费在线观看 |