Fin*_*Fin 15 network-programming go kubernetes grpc
我有一个 Go gRPC 客户端连接到在 k8s 集群中的不同 pod 中运行的 gRPC 服务器。
它运行良好,可以接收和处理请求。
我现在想知道在 gRPC 服务器 Pod 被回收的情况下如何最好地实现弹性。
据我所知,clientconn.go 代码应该自动处理重新连接,但我就是无法让它工作,我担心我的实现在第一个实例中是不正确的。
从 main 调用代码:
go func() {
if err := gRPCClient.ProcessRequests(); err != nil {
log.Error("Error while processing Requests")
//do something here??
}
}()
Run Code Online (Sandbox Code Playgroud)
我在 gRPCClient 包装器模块中的代码:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
break
}
if err != nil {
//when pod is recycled, this is what's hit with err:
//rpc error: code = Unavailable desc = transport is closing"
//what is the correct pattern for recovery here so that we can await connection
//and continue processing requests once more?
//should I return err here and somehow restart the ProcessRequests() go routine in the
//main funcition?
break
} else {
//the happy path
//code block to process any requests that are received
}
}
return nil
}
func (reqclient *RequestClient) Close() {
//this is called soon after the conneciton drops
reqclient.conn.Close()
}
Run Code Online (Sandbox Code Playgroud)
编辑:艾敏·拉莱托维奇(Emin Laletovic)在下面优雅地回答了我的问题,并且大部分内容都得到了解答。我必须对 waitUntilReady 函数进行一些更改:
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
currentState := grpcclient.conn.GetState()
stillConnecting := true
for currentState != connectivity.Ready && stillConnecting {
//will return true when state has changed from thisState, false if timeout
stillConnecting = grpcclient.conn.WaitForStateChange(ctx, currentState)
currentState = grpcclient.conn.GetState()
log.WithFields(log.Fields{"state: ": currentState, "timeout": timeoutDuration}).Info("Attempting reconnection. State has changed to:")
}
if stillConnecting == false {
log.Error("Connection attempt has timed out.")
return false
}
return true
}
Run Code Online (Sandbox Code Playgroud)
Emi*_*vic 13
RPC 连接由 自动处理clientconn.go,但这并不意味着流也会自动处理。
流一旦断开,无论是由于 RPC 连接中断还是其他原因,都无法自动重新连接,一旦 RPC 连接恢复,您需要从服务器获取新的流。
等待 RPC 连接进入状态READY并建立新流的伪代码可能如下所示:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
go grpcclient.process()
for {
select {
case <- grpcclient.reconnect:
if !grpcclient.waitUntilReady() {
return errors.New("failed to establish a connection within the defined timeout")
}
go grpcclient.process()
case <- grpcclient.done:
return nil
}
}
}
func (grpcclient *gRPCClient) process() {
reqclient := GetStream() //always get a new stream
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
grpcclient.done <- true
return
}
if err != nil {
grpcclient.reconnect <- true
return
} else {
//the happy path
//code block to process any requests that are received
}
}
}
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
return grpcclient.conn.WaitForStateChange(ctx, conectivity.Ready)
}
Run Code Online (Sandbox Code Playgroud)
编辑:
重新审视上面的代码,应该纠正一些错误。该WaitForStateChange函数等待连接状态从传递状态更改,但不等待连接更改为传递状态。
最好跟踪连接的当前状态,并Connect在通道空闲时使用该功能进行连接。
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
go grpcclient.process()
for {
select {
case <- grpcclient.reconnect:
if !grpcclient.isReconnected(1*time.Second, 60*time.Second) {
return errors.New("failed to establish a connection within the defined timeout")
}
go grpcclient.process()
case <- grpcclient.done:
return nil
}
}
}
func (grpcclient *gRPCClient) isReconnected(check, timeout time.Duration) bool {
ctx, cancel := context.context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(check)
for{
select {
case <- ticker.C:
grpcclient.conn.Connect()
if grpcclient.conn.GetState() == connectivity.Ready {
return true
}
case <- ctx.Done():
return false
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
18831 次 |
| 最近记录: |