GRPC异步响应流C#

War*_*ick 6 c# grpc

如何从处理程序外部生成RPC的流响应值?(具体来说,来自IObservable)我目前正在执行以下操作,但这是创建跨线程问题,因为AnRxObservable在RPC处理程序之间共享...

public override Task GetTicker(RequestProto request, ServerCallContext context)
{
    var subscription = AnRxObservable.Subscribe(value =>
    {
        responseStream.WriteAsync(new ResponseProto
        {
            Value = value
        });
    });

    // Wait for the RPC to be canceled (my extension method
    // that returns a task that completes when the CancellationToken
    // is cancelled)
    await context.CancellationToken.WhenCancelled();

    // Dispose of the buffered stream
    bufferedStream.Dispose();

    // Dispose subscriber (tells rx that we aren't subscribed anymore)
    subscription.Dispose();

    return Task.FromResult(1);
}
Run Code Online (Sandbox Code Playgroud)

这段代码感觉不对......但我看不到从RPC处理程序外部创建的共享源流式传输RPC响应的任何其他方式.

Jan*_*sch 5

一般而言,当您尝试从推入模型​​(IObservable)转换为拉入模型(枚举要写入和写入的响应)时,需要消息的中间缓冲区-例如blockingQueue。然后,处理程序主体可以是一个异步循环,该循环尝试为队列获取下一条消息(最好以异步方式)并将其写入responseStream。

另外,请注意,gRPC API仅允许您在任何给定时间获得1个进行中的响应-而您的代码段对此并不尊重。因此,您需要在开始另一个写操作之前等待WriteAsync()(这是您需要中间队列的另一个原因)。

该链接可能对解释push vs pull范例很有用:什么时候使用IEnumerable vs IObservable?