如何确保在 gRPC 双向流式传输中收到消息?

Hug*_*ugo 5 streaming bidirectional go grpc

我如何知道另一端已收到我通过 gRPC 流发送的消息?

是否有一种构建方法可以在 gRPC 双向流式传输中执行此操作,或者我是否需要仅使用流式传输,然后发回响应?

原型文件:


service SimpleService {
rpc SimpleRPC (stream SimpleData) returns (stream SimpleData) {}
}

message SimpleData {
string msg = 1;
}
Run Code Online (Sandbox Code Playgroud)

去代码:


client := pb.NewSimpleServiceClient(conn)
stream, err := client.SimpleRPC(context.Background())
waitc := make(chan struct{})

msg := &pb.SimpleData{"sup"}
go func() {
for {
   stream.Send(msg)
    }
}()
<-waitc
stream.CloseSend()

Run Code Online (Sandbox Code Playgroud)

Dan*_*Dan 2

不幸的是,gRPC 不能很好地处理这种情况。它也不处理单向流,甚至不处理单个 RPC 请求。失败模式(没有流的简单情况)是这样的:

  • gRPC 客户端向服务器发送请求,超时时间为T
  • 服务器成功响应客户端T-1sec
  • 将消息发送给客户端和/或解析它需要2sec,将其放在T+1sec(超过超时期限)
  • 客户端在该时间之前的请求超时T,并看到错误

最好的解决方案是找出一种方法来避免这样做,从服务器的角度来看,使协议无状态——换句话说,如果客户端看到失败,让他们重试。或者,您可以反转连接的方向,以便“服务器”将 RPC 发送到客户端,然后客户端以 ack 进行响应。

困难的地方是服务器存储的内容取决于客户端的状态。例如,假设服务器有一个有序的数据队列,它试图将其发送到客户端,并且它需要知道客户端在从队列中删除项目之前看到的最后一件事是什么。这里的选项是:

  • 如果您有一个 RPC,请将其转换为(非常短暂的)双向 RPC 流,其中客户端可以发送一个ClientRequest内部为对象的oneof对象ClientActualRequest或一个空ClientSuccess对象,表明客户端看到了该消息。然后,在客户端的实现中,让它先发送一个ClientRequest { ClientActualRequest {...} },在接收处理来自服务器的响应后,让它发送一个ClientRequest { ClientSuccess {} },然后终止流。(当然,服务器也必须知道如何处理。)
  • 如果您在任一/两个方向上进行流传输,就会变得更加困难。在最简单的情况下,您可以执行与上面的单个 RPC 情况相同的操作,其中首先客户端发送真正的请求,然后在服务器的每次更新时发送确认。重要的是,客户端必须按顺序处理流,服务器也必须按顺序处理确认,否则服务器可能会认为客户端已经看到了所有消息,而实际上它只看到了一部分消息。(请注意,gRPC 保证按顺序交付,因此只有在客户端实现中进行重新排序才会导致问题;可能只有在使用异步 gRPC API 时才有可能,除非您正在做一些非常奇怪的事情。)将会是,您可以让客户端last_known_sequence_number作为协议的一部分发送到服务器,而不是使用 ack 消息,并且服务器只能安全地删除直到该序列号的项目;但是,如果您这样做,则没有简单的方法可以传回更复杂的状态,因为它只是一个序列号。
  • 如果在客户端上进行有序处理不可行,那么您必须完全疯狂。现在,您的协议需要服务器在队列中的每条消息上放置序列号,并且您的客户端ClientSuccess对象需要对您正在确认的序列号进行编码,以便服务器知道它可以删除哪些内容。这基本上是 TCP 内部所做的事情,但当然只是在从服务器到客户端的方向上进行确认……显然,如果您将解决方案与 TCP 进行比较,我们正在谈论一些非常复杂的东西。

请记住,在所有这些情况下,同样,即使 RPC 成功,也不能保证客户端会看到服务器的 ack,在我们的新协议中,服务器也不能保证客户端会看到客户端的 ack。因此,您的客户端需要能够在连接重置时处理查看重复的请求(最多可达您一次允许的任何数量的正在进行的请求)。除非你实现“ack of an ack”,那就是:-P。