使用以下原型缓冲区代码:
syntax = "proto3";
package pb;
message SimpleRequest {
int64 number = 1;
}
message SimpleResponse {
int64 doubled = 1;
}
// All the calls in this serivce preform the action of doubling a number.
// The streams will continuously send the next double, eg. 1, 2, 4, 8, 16.
service Test {
// This RPC streams from the server only.
rpc Downstream(SimpleRequest) returns (stream SimpleResponse);
}
Run Code Online (Sandbox Code Playgroud)
我能够成功打开一个流,并不断从服务器获取下一个双倍的数字。
我的运行代码如下所示:
ctxDownstream, cancel := context.WithCancel(ctx)
downstream, err := testClient.Downstream(ctxDownstream, &pb.SimpleRequest{Number: 1})
for {
responseDownstream, err := downstream.Recv()
if err != io.EOF {
println(fmt.Sprintf("downstream response: %d, error: %v", responseDownstream.Doubled, err))
if responseDownstream.Doubled >= 32 {
break
}
}
}
cancel() // !!This is not a graceful shutdown
println(fmt.Sprintf("%v", downstream.Trailer()))
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是使用上下文取消意味着我的下游.Trailer() 响应为空。有没有办法从客户端优雅地关闭此连接并接收下游.Trailer()。
注意:如果我从服务器端关闭下游连接,我的预告片就会被填充。但我无法指示我的服务器端关闭这个特定的流。因此必须有一种方法可以优雅地关闭流客户端。
谢谢。
根据要求一些服务器代码:
func (b *binding) Downstream(req *pb.SimpleRequest, stream pb.Test_DownstreamServer) error {
request := req
r := make(chan *pb.SimpleResponse)
e := make(chan error)
ticker := time.NewTicker(200 * time.Millisecond)
defer func() { ticker.Stop(); close(r); close(e) }()
go func() {
defer func() { recover() }()
for {
select {
case <-ticker.C:
response, err := b.Endpoint(stream.Context(), request)
if err != nil {
e <- err
}
r <- response
}
}
}()
for {
select {
case err := <-e:
return err
case response := <-r:
if err := stream.Send(response); err != nil {
return err
}
request.Number = response.Doubled
case <-stream.Context().Done():
return nil
}
}
}
Run Code Online (Sandbox Code Playgroud)
您仍然需要在预告片中填充一些信息。我使用 grpc.StreamServerInterceptor 来执行此操作。
根据 grpc go 文档
Trailer 从服务器返回预告片元数据(如果有)。 它只能在stream.CloseAndRecv返回后调用,或者stream.Recv返回非nil错误(包括io.EOF)。
因此,如果您想在客户端中阅读预告片,请尝试这样的操作
ctxDownstream, cancel := context.WithCancel(ctx)
defer cancel()
for {
...
// on error or EOF
break;
}
println(fmt.Sprintf("%v", downstream.Trailer()))
Run Code Online (Sandbox Code Playgroud)
当出现错误时中断无限循环并打印预告片。cancel将在函数结束时调用,因为它被延迟了。
| 归档时间: |
|
| 查看次数: |
4092 次 |
| 最近记录: |