如何在grpc中正确设计发布-订阅模式?

Dmi*_*kin 3 java publish-subscribe grpc

我正在尝试使用grpc来实现pub sub模式,但是我对如何正确地做到这一点感到困惑。

我的原型: rpc call (google.protobuf.Empty) returns (stream Data);

客户:

asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
         @Override
         public void onNext(Data value) {
           // process a data

         @Override
         public void onError(Throwable t) {

         }

         @Override
         public void onCompleted() {

         }
       });

   } catch (StatusRuntimeException e) {
     LOG.warn("RPC failed: {}", e.getStatus());
   }

   Thread.currentThread().join();
Run Code Online (Sandbox Code Playgroud)

服务器服务:

public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
  private final BlockingQueue<Data> queue;
  private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();

  public Sender(BlockingQueue<Data> queue) {
    this.queue = queue;
  }

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

  @Override
  public void run() {
    while (!Thread.currentThread().isInterrupted()) {
      try {
        // waiting for first element
        Data data = queue.take();
        // send head element
        observers.forEach(o -> o.onNext(data));

      } catch (InterruptedException e) {
        LOG.error("error: ", e);
        Thread.currentThread().interrupt();
      }
    }
  }
}

Run Code Online (Sandbox Code Playgroud)

如何正确地从全球观察员中删除客户?连接断开时如何接收某种信号?
如何管理客户端-服务器重新连接?连接断开时如何强制客户端重新连接?

提前致谢!

Car*_*elo 5

在执行您的服务时:

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }
Run Code Online (Sandbox Code Playgroud)

您需要获取当前请求的上下文,并监听cancel。对于单请求,多响应调用(即服务器流),gRPC生成的代码已简化为直接传递请求。这意味着您无权直接访问底层ServerCall.Listener,这通常是您侦听客户端断开连接和取消连接的方式。

而是,每个gRPC调用都Context与之关联,该调用携带取消和其他请求范围的信号。对于您的情况,您只需要通过添加自己的侦听器来侦听取消,然后将其安全地从链接的哈希集中删除响应观察器。


至于重新连接:如果连接断开,gRPC客户端将自动重新连接,但除非安全,否则通常不会重试RPC。对于服务器流式RPC,通常这样做是不安全的,因此您需要直接在客户端上重试RPC。

  • 是的。Google 的 Cloud Pubsub 位于 gRPC 之上,其客户端源代码在 GitHub 上是公开的。你可以看看它的灵感。 (3认同)