gRPC 推送和扇出

Sve*_*dos 6 .net grpc

是否有可能使用 gRPC 作为具有扇出功能的推送服务?在Google给出的示例中,服务器端(C#)有以下代码:

    public override async Task ListFeatures(Rectangle request, IServerStreamWriter<Feature> responseStream, ServerCallContext context)
    {
        var responses = features.FindAll( (feature) => feature.Exists() && request.Contains(feature.Location) );
        foreach (var response in responses)
        {
            await responseStream.WriteAsync(response);
        }
    }
Run Code Online (Sandbox Code Playgroud)

问题就在这里:

  1. 仅当客户端明确要求时才会生成和写入数据。
  2. 只有提出请求的客户端才会获得新数据。

我想我需要的是:

  1. 保留每个请求(订阅)的客户端的所有 IServerStreamWriter。
  2. 当新数据可用时,通过外部事件触发写入。
  3. 写入所有streamWriter

编辑: 根据卡尔的建议,我现在有以下内容:

原型:

service PubSub {
 rpc Subscribe(Subscription) returns (stream Event) {}
 rpc Unsubscribe(Subscription) returns (Unsubscription) {}
}

message Event
{
   string Value = 1;
}
message Subscription
{
  string Id = 1;
}
message Unsubscription
{
  string Id = 1;
}
Run Code Online (Sandbox Code Playgroud)

发布订阅实现:

public class PubSubImpl : PubSub.PubSubBase
{
    private readonly BufferBlock<Event> _buffer = new BufferBlock<Event>();

    private Dictionary<string, IServerStreamWriter<Event>> _subscriberWritersMap =
        new Dictionary<string, IServerStreamWriter<Event>>();

     public override async Task Subscribe(Subscription subscription, IServerStreamWriter<Event> responseStream, ServerCallContext context)
    {
        //Dict to hold a streamWriter for each subscriber.
        _subscriberWritersMap[subscription.Id] = responseStream;

        while (_subscriberWritersMap.ContainsKey(subscription.Id))
        {
            //Wait on BufferBlock from MS Dataflow package.
            var @event = await _buffer.ReceiveAsync();
            foreach (var serverStreamWriter in _subscriberWritersMap.Values)
            {
                await serverStreamWriter.WriteAsync(@event);
            }
        }
    }

    public override Task<Unsubscription> Unsubscribe(Subscription request, ServerCallContext context)
    {
        _subscriberWritersMap.Remove(request.Id);
        return Task.FromResult(new Unsubscription() { Id = request.Id });
    }

    public void Publish(string input)
    {
        _buffer.Post(new Event() { Value = input });
    }
}
Run Code Online (Sandbox Code Playgroud)

现在可以像这样发送“Push”:

   while ((input = Console.ReadLine()) != "q")
    {
      pubsubImp.Publish(input);
    }
Run Code Online (Sandbox Code Playgroud)

在客户端我有:

public async Task Subscribe()
{
    _subscription = new Subscription() { Id = Guid.NewGuid().ToString() };
    using (var call = _pubSubClient.Subscribe(_subscription))
    {
        //Receive
        var responseReaderTask = Task.Run(async () =>
        {
            while (await call.ResponseStream.MoveNext())
            {
                Console.WriteLine("Event received: " + call.ResponseStream.Current);
            }
        });

        await responseReaderTask;
    }
}

public void Unsubscribe()
{
    _pubSubClient.Unsubscribe(_subscription);
}
Run Code Online (Sandbox Code Playgroud)

客户端主程序的工作方式如下:

static void Main(string[] args)
{
    var channel = new Channel("127.0.0.1:50052", 
                                ChannelCredentials.Insecure);
    var subscriber = new Subsriber(new PubSub.PubSubClient(channel));

    Task.Run(async () =>
    {
        await subscriber.Subscribe();
    }).GetAwaiter();

    Console.WriteLine("Hit key to unsubscribe");
    Console.ReadLine();

    subscriber.Unsubscribe();

    Console.WriteLine("Unsubscribed...");

    Console.WriteLine("Hit key to exit...");
    Console.ReadLine();

}
Run Code Online (Sandbox Code Playgroud)

目前看来它有效。这是应该/可以这样做的吗?测试解决方案可以在以下位置找到: https://github.com/KingKnecht/gRPC-PubSub

Aar*_*don 5

我认为您不需要在服务器应用程序中跟踪客户端。相反,对于每个“推送”方法,创建一个EventWaitHandle并持续等待它。然后在另一个上下文中,向客户端发出信号EventWaitHandle,以便等待的“推送”方法可以WriteAsync发送给客户端。

例如

public class PubSubImpl : PubSub.PubSubBase
{
    EventWaitHandle evwStatus = new EventWaitHandle(false, EventResetMode.ManualReset);
    string status;

    public override async Task Subscribe(Subscription subscription, IServerStreamWriter<Event> responseStream, ServerCallContext context)
    {
        // respond with the current status
        await serverStreamWriter.WriteAsync(new Event { Value = status });
        // wait until we're signaled with a different status
        while(evwStatus.WaitOne())
        {
            await serverStreamWriter.WriteAsync(new Event { Value = status });
        }
    }   

    public void Publish(string input)
    {
        status = input;
        // let the waiting threads respond
        evwStatus.Set();
        // halt the waiting threads
        evwStatus.Reset();
    }
}
Run Code Online (Sandbox Code Playgroud)


Car*_*elo 2

这是可能的,但我无法评论这是否是一个好的做法。正如您所说,您需要跟踪每个客户端,并确保在客户端断开连接而无法访问时将其删除。

你的一般方法听起来是正确的。推送 RPC 应该是双向流式传输(假设每个客户端都可以发起推送)。当客户端连接到服务器时,将客户端记录在线程安全集合中。当其中一个客户端发送消息时,循环访问将消息发送到每个连接的客户端的集合。如果发送失败,请从客户端移除客户端并关闭连接。

示例中有一个更简单的版本,称为“RouteGuide”,它用 gRPC 支持的语言实现了一个简单的聊天服务器。