Observable的doOnError正确位置

Tav*_*avo 11 java rx-java

我有点新鲜Observers,我仍然想弄明白.我有以下代码:

observableKafka.getRealTimeEvents()
        .filter(this::isTrackedAccount)
        .filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload()))
        .map(ledgerMapper::mapLedgerTransaction)
        .map(offerCache::addTransaction)
        .filter(offer -> offer != null)  // Offer may have been removed from cache since last check
        .filter(Offer::isReady)
        .doOnError(throwable -> { 
              LOG.info("Exception thrown on realtime events");
          })
        .forEach(awardChecker::awardFailOrIgnore);
Run Code Online (Sandbox Code Playgroud)

getRealTimeEvents()返回一个Observable<Event>.

.doOnError问题的位置是什么?另外,在这段代码中添加多个调用会产生什么影响?我已经意识到我可以做到并且所有这些都被调用,但我不确定它的目的是什么.

aka*_*okd 23

是的,它确实.doOnError当错误在该特定点通过流时起作用,因此如果操作符在doOnErrorthrow 之前,则将调用您的操作.但是,如果doOnError进一步放置,可能会也可能不会调用它,具体取决于链中的下游运营商.

特定

Observer<Object> ignore = new Observer<Object>() {
    @Override public void onCompleted() {
    }
    @Override public void onError(Throwable e) {
    }
    @Override public void onNext(Object t) {
    }
};
Run Code Online (Sandbox Code Playgroud)

例如,以下代码将始终调用doOnError:

Observable.<Object>error(new Exception()).doOnError(e -> log(e)).subscribe(ignore);
Run Code Online (Sandbox Code Playgroud)

但是,此代码不会:

Observable.just(1).doOnError(e -> log(e))
.flatMap(v -> Observable.<Integer>error(new Exception())).subscribe(ignore);
Run Code Online (Sandbox Code Playgroud)

大多数运营商都会反弹源自下游的异常.

doOnError如果通过onErrorResumeNext或转换异常,添加multipe 是可行的onExceptionResumeNext:

Observable.<Object>error(new RuntimeException())
.doOnError(e -> log(e))
.onErrorResumeNext(Observable.<Object>error(new IllegalStateException()))
.doOnError(e -> log(e)).subscribe(ignore);
Run Code Online (Sandbox Code Playgroud)

否则,您将在链的多个位置记录相同的异常.


Sim*_*slé 5

doOn???有副作用的方法,处理并不是真正的核心业务价值。日志记录是一个很好的用途。也就是说,有时您想对错误执行更有意义的操作,例如重试或向用户显示消息等。对于这些情况,“ rx”方式将是处理subscribe呼叫中的错误。

doOnError(以及其他doOn方法)将原始文件包装Observable到新文件中,并为其添加行为(onError显然是在其方法周围)。这就是为什么您可以多次调用它的原因。能够在链中任何地方调用它的另一个好处是,您可以访问流的使用者(Subscriber)隐藏的错误,例如,因为链中有重试...