我正在尝试使用服务流和客户端上的异步存根在 grpc 上设置一个简单的发布/订阅模式。在将部分流消息实现回客户端后,我想处理连接断开的情况。现在我正在执行部分服务,例如关闭服务并且客户端应该从连接丢失中“恢复”。
我已经在 google/github/so 上阅读并搜索了有关重试机制的信息,并最终为流消息的服务中的方法设置了重试策略。据我了解,当服务返回重试策略中定义的一些 retryableStatusCodes 时,重试机制应该起作用。在客户端引入重试策略后,我想对其进行测试,以下两个场景的结果让我对重试感到困惑。
第一个场景:
第二种情况:
总的来说,让我感到困惑的是为什么这两种场景之间的行为存在差异?为什么在第一个场景中检测到服务器返回 UNAVAILABLE 并尝试重试,但在第二个场景中,即使状态相同,重试也不起作用?
以下是客户端连接调用、服务连接方法和客户端重试策略设置的代码
client:
messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
@Override
public void onNext(MessageResponse messageResponse) {
//process new message
MessageDto message = new MessageDto();
message.setBody(messageResponse.getBody());
message.setTitle(messageResponse.getTitle());
messageService.broadcastMessage(message);
}
@Override
public void onError(Throwable throwable) {
//service went down
LOGGER.error(throwable.getStackTrace());
}
@Override
public void onCompleted() {
//This method should be called when user logs out of the application
LOGGER.info(String.format("Message streaming terminated for user %d", userId));
}
});
Run Code Online (Sandbox Code Playgroud)
service:
@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
Long userId = request.getUserId();
ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
(ServerCallStreamObserver<MessageResponse >) responseObserver;
serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
registerClient(userId, serverCallStreamObserver);
//responseObserver.onCompleted() is left out so connection is not terminated
}
@EventListener
public void listenForMessages(MessageEvent messageEvent) {
//omitted code (just some data retrieving - populate conn and message vars)....
MessageResponse.Builder builder = MessageResponse.newBuilder();
StreamObserver<MessageResponse> observer = conn.getResponseObserver();
builder.setType(message.getType());
builder.setTitle(message.getTitle());
builder.setBody(message.getBody());
observer.onNext(builder.build())
}
Run Code Online (Sandbox Code Playgroud)
retryPolicy:
{
"methodConfig" : [
{
"name": [
{
"service": "ch.example.proto.MessageService",
"method": "connect"
}
],
"retryPolicy": {
"maxAttempts": 10,
"initialBackoff": "5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}
]
}
Run Code Online (Sandbox Code Playgroud)
问题是接收消息会提交RPC。这在gRFC A6 客户端重试中进行了讨论。它提到了Response-Headers,当服务器响应第一条消息时隐式发送。
本质上,一旦 gRPC 将数据传回客户端,就无法自动重试。如果 gRPC 重试,它应该如何将新流与它已经响应的流结合起来?它应该跳过第一个N响应吗?但是如果现在的反应不同呢?元数据(通过 传递Response-Headers)的问题更严重,因为这些元数据无法再次提供给客户端。
gRPC 能够将客户端的请求重播到多个后端,但是一旦它开始接收来自后端的响应,它将“固定”到该后端并且无法更改其决定。
您将需要应用程序级别的重试来重新建立流。当客户端重新建立流时,它可能需要修改请求以通知服务器客户端已经收到了哪些消息。
| 归档时间: |
|
| 查看次数: |
3189 次 |
| 最近记录: |