使用 gRPC 的观察者模式 - C#

drO*_*iTs 1 c# observer-pattern grpc

对不起,如果这是一个愚蠢的问题,但我在互联网上找不到任何有用的信息。

有没有人尝试过使用 gRPC 作为通信在 C# 中实现观察者模式?如果是,请告诉我链接。

非常感谢,并致以最诚挚的问候。

Rob*_*win 5

我已经实现了一个客户端便利类包装器,将服务器流调用转换为我正在工作的项目的常规事件。不确定这是否是您所追求的。这是一个简单的 gRPC 服务器,它每秒将时间作为字符串发布一次。

syntax = "proto3";
package SimpleTime;

service SimpleTimeService
{
   rpc MonitorTime(EmptyRequest) returns (stream TimeResponse);
}

message EmptyRequest{}

message TimeResponse
{
   string time = 1;
}
Run Code Online (Sandbox Code Playgroud)

服务器实现,它每秒只循环一次,返回当前时间的字符串表示,直到取消,如下所示

public override async Task MonitorTime(EmptyRequest request, IServerStreamWriter<TimeResponse> responseStream, ServerCallContext context)
{
   try
   {
      while (!context.CancellationToken.IsCancellationRequested)
      {
         var response = new TimeResponse
         {
            Time = DateTime.Now.ToString()
         };
         await responseStream.WriteAsync(response);
         await Task.Delay(1000);
      }
   }
   catch (Exception)
   { 
      Console.WriteLine("Exception on Server");
   }
}
Run Code Online (Sandbox Code Playgroud)

对于客户端,我创建了一个包含 gRPC 客户端的类,并将服务器流式 MonitorTime 调用的结果公开为一个普通的 ole .net 事件。

   public class SimpleTimeEventClient
   {
      private SimpleTime.SimpleTimeService.SimpleTimeServiceClient mClient = null;
      private CancellationTokenSource mCancellationTokenSource = null;
      private Task mMonitorTask = null;
      public event EventHandler<string> OnTimeReceived;

      public SimpleTimeEventClient()
      {
         Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure);
         mClient = new SimpleTime.SimpleTimeService.SimpleTimeServiceClient(channel);
      }

      public void Startup()
      {
         mCancellationTokenSource = new CancellationTokenSource();
         mMonitorTask = Task.Run(() => MonitorTimeServer(mCancellationTokenSource.Token));
      }

      public void Shutdown()
      {
         mCancellationTokenSource.Cancel();
         mMonitorTask.Wait(10000);
      }

      private async Task MonitorTimeServer(CancellationToken token)
      {
         try
         {
            using (var call = mClient.MonitorTime(new SimpleTime.EmptyRequest()))
            {
               while(await call.ResponseStream.MoveNext(token))
               {
                  var timeResult = call.ResponseStream.Current;
                  OnTimeReceived?.Invoke(this, timeResult.Time);
               }
            }
         }
         catch(Exception e)
         {
            Console.WriteLine($"Exception encountered in MonitorTimeServer:{e.Message}");
         }
      }
   }
Run Code Online (Sandbox Code Playgroud)

现在创建客户端并订阅事件。

  static void Main(string[] args)
  {
     SimpleTimeEventClient client = new SimpleTimeEventClient();
     client.OnTimeReceived += OnTimeReceivedEventHandler;
     client.Startup();
     Console.WriteLine("Press any key to exit");
     Console.ReadKey();
     client.Shutdown();

  }

  private static void OnTimeReceivedEventHandler(object sender, string e)
  {
     Console.WriteLine($"Time: {e}");
  }
Run Code Online (Sandbox Code Playgroud)

当运行产生时

在此处输入图片说明

我省略了很多错误检查,以便使示例更小。我所做的一件事是对于具有许多服务器流调用的 gRPC 接口,这些调用可能会或可能不会对调用客户端感兴趣,是实现事件访问器(添加,删除)以仅在有客户端时调用服务器端流方法已订阅包装事件。希望这有帮助