我有一个 Go gRPC 客户端连接到在 k8s 集群中的不同 pod 中运行的 gRPC 服务器。
它运行良好,可以接收和处理请求。
我现在想知道在 gRPC 服务器 Pod 被回收的情况下如何最好地实现弹性。
据我所知,clientconn.go 代码应该自动处理重新连接,但我就是无法让它工作,我担心我的实现在第一个实例中是不正确的。
从 main 调用代码:
go func() {
if err := gRPCClient.ProcessRequests(); err != nil {
log.Error("Error while processing Requests")
//do something here??
}
}()
Run Code Online (Sandbox Code Playgroud)
我在 gRPCClient 包装器模块中的代码:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
break
}
if err != nil {
//when pod is recycled, this is what's hit …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用服务流和客户端上的异步存根在 grpc 上设置一个简单的发布/订阅模式。在将部分流消息实现回客户端后,我想处理连接断开的情况。现在我正在执行部分服务,例如关闭服务并且客户端应该从连接丢失中“恢复”。
我已经在 google/github/so 上阅读并搜索了有关重试机制的信息,并最终为流消息的服务中的方法设置了重试策略。据我了解,当服务返回重试策略中定义的一些 retryableStatusCodes 时,重试机制应该起作用。在客户端引入重试策略后,我想对其进行测试,以下两个场景的结果让我对重试感到困惑。
第一个场景:
第二种情况:
总的来说,让我感到困惑的是为什么这两种场景之间的行为存在差异?为什么在第一个场景中检测到服务器返回 UNAVAILABLE 并尝试重试,但在第二个场景中,即使状态相同,重试也不起作用?
以下是客户端连接调用、服务连接方法和客户端重试策略设置的代码
client:
messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
@Override
public void onNext(MessageResponse messageResponse) {
//process new message
MessageDto message = new MessageDto();
message.setBody(messageResponse.getBody());
message.setTitle(messageResponse.getTitle());
messageService.broadcastMessage(message);
}
@Override
public void onError(Throwable throwable) {
//service went down
LOGGER.error(throwable.getStackTrace());
}
@Override …Run Code Online (Sandbox Code Playgroud)