来自 gRPC StreamObserver 的“桥接”Reactor 的 Flux

Dom*_*aja 2 java project-reactor grpc grpc-java

我想从gRPC StreamObserver创建Reactor Flux。只要 StreamObserver 没有本地实现相应的接口(参见例如这个问题),就需要这样做。

我想出的内容大致如下:

final StreamObserver<ProtoResponse>[] streamObserverArray = new  StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
        @Override
        public void onNext(ProtoResponse value) {
            final Response response = convertFromProto(value);
            sink.next(response);
        }

        @Override
        public void onError(Throwable throwable) {
            sink.error(throwable);
        }

        @Override
        public void onCompleted() {
            sink.complete();
        }
    });
myFlux
    .doOnError(throwable -> {/* actual logic in here */}) //
    .doOnComplete(() -> {/* actual logic in here */}) //
    .doOnCancel(() -> {/* actual logic in here */}) //
    .parallel() //
    .runOn(Schedulers.parallel()) //
    .doOnNext(/* actual heavy lifting logic in here */) //
    .map(/* ... */) //
    .sequential() //
    .doOnNext(/* ...*/) //
    .subscribe(); // needed to start the actual processing of the events on this Flux

MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);
Run Code Online (Sandbox Code Playgroud)

我想在这里使用 Reactor 的主要想法是将“繁重工作”并行分配到多个线程上,而不是在 gRPC 请求线程上执行此操作。

我发现上面的方法有几个问题:

  • 我真的不喜欢数组的解决StreamObserver[]方法
  • 我需要首先创建完整的通量,因为如果我没有首先完成它.subscribe(),则StreamObserver可能是nullgRPC 开始通信时(也称为竞争条件)。
  • 我不确定背压是否按预期方式工作(尽管目前这不是我主要关心的问题)。

所以我的问题是:从 gRPC StreamObserver 桥接到 Reactor Flux 的最佳/首选方法是什么?有没有最佳实践?

Ste*_*n L 5

现在有一个更简单的解决方案:

https://github.com/salesforce/reactive-grpc

它支持将 gRPC 桥接到 Reactor 和 RxJava 2。