Dom*_*aja 2 java project-reactor grpc grpc-java
我想从gRPC StreamObserver创建Reactor Flux。只要 StreamObserver 没有本地实现相应的接口(参见例如这个问题),就需要这样做。
我想出的内容大致如下:
final StreamObserver<ProtoResponse>[] streamObserverArray = new StreamObserver[1];
Flux<Response> myFlux Flux.create(sink -> streamObserverArray[0] = new StreamObserver<ProtoResponse>() {
@Override
public void onNext(ProtoResponse value) {
final Response response = convertFromProto(value);
sink.next(response);
}
@Override
public void onError(Throwable throwable) {
sink.error(throwable);
}
@Override
public void onCompleted() {
sink.complete();
}
});
myFlux
.doOnError(throwable -> {/* actual logic in here */}) //
.doOnComplete(() -> {/* actual logic in here */}) //
.doOnCancel(() -> {/* actual logic in here */}) //
.parallel() //
.runOn(Schedulers.parallel()) //
.doOnNext(/* actual heavy lifting logic in here */) //
.map(/* ... */) //
.sequential() //
.doOnNext(/* ...*/) //
.subscribe(); // needed to start the actual processing of the events on this Flux
MyGrpcService.newStub(channel).getResponses(protoRequest, streamObserverArray[0]);
Run Code Online (Sandbox Code Playgroud)
我想在这里使用 Reactor 的主要想法是将“繁重工作”并行分配到多个线程上,而不是在 gRPC 请求线程上执行此操作。
我发现上面的方法有几个问题:
StreamObserver[]方法.subscribe(),则StreamObserver可能是nullgRPC 开始通信时(也称为竞争条件)。所以我的问题是:从 gRPC StreamObserver 桥接到 Reactor Flux 的最佳/首选方法是什么?有没有最佳实践?
| 归档时间: |
|
| 查看次数: |
2803 次 |
| 最近记录: |