如何获取Flux中引发异常的元素?

Kam*_*mil 6 java reactive-programming project-reactor

假设我有一个 ids: 数组[9, 8, 7, 6]

我做了一些处理,一个元素导致抛出异常。我想以自己的方式处理这种情况(比如说记录它)并让其他元素顺其自然。

我怎么知道是哪一个?我需要在onError处理中包含这个元素。

Flux.fromArray(myArray)
  .flatMap(element -> {
    var foo = processMyEl(element);  
    return anotherProcess(foo); // this returns Mono
  })
  .onErrorOperator(element -> handleMyError(element)) // this line is what I need
  
Run Code Online (Sandbox Code Playgroud)

所以,我看到,这几乎很好.onErrorContinue((error, obj) ->,它发出一个错误和一个对象。

但这obj不是element导致异常的对象,而是造成异常的对象。它发生在我的处理方法内部,并且不必每次都是同一类型的对象。

.onErrorReturn(...)- 不是我真正想要的

.doOnError(error ->- 没有我的元素的信息

.onErrorResume(error ->- 与上面相同

有人建议我可以创建自己的异常并将元素传递到那里,然后从异常中检索它。但我应该如何抛出异常呢?

我应该采用旧的尝试捕获方式吗:

Flux.fromArray(myArray)
  .flatMap(el -> {
    try {
      var foo = processMyEl(el);  
      return anotherProcess(foo); // this returns Mono
    } catch (Exception e) {
      return Mono.error(new MyException(el));
     }
    })
  .onErrorOperator(error -> handleMyError(error.getElement()))
Run Code Online (Sandbox Code Playgroud)

看起来不太好

编辑:

不仅看起来很糟糕,而且不起作用。异常根本没有被捕获,直接触发doOnTerminate()并停止整个流

更新:

感谢@JEY 我.onErrorResume()在里面使用了flatMap.

我还将第一个方法转变为反应流Mono.defer(() -> Mono.just(processMyEl(el)))

正如注释:使用Mono.defer()允许我使用onErrorResume因为Mono.just() 不能发出错误信号。

最终代码如下所示:

Flux.fromArray(myArray)
    .flatMap(element -> Mono.defer(() -> Mono.just(processMyEl(element)))
        .onErrorResume(th -> handleMyError(element, th))
    )
    .flatMap(foo -> anotherProcess(foo)
        .onErrorResume(th -> handleMyError(foo, th)
    )
Run Code Online (Sandbox Code Playgroud)

在哪里:

private Mono<> handleMyError(el, th) {
  // handling code
  return Mono.empty()
}
Run Code Online (Sandbox Code Playgroud)

JEY*_*JEY 3

根据@Kamil 的要求,我将添加我的评论作为答案:

您应该只处理 flatMap 中的错误并返回 Mono.empty() 来丢弃它,执行以下操作:

Flux.fromArray(myArray)
  .flatMap(el -> anotherProcess(processMyEl(el)).onErrorResume(th -> handleError(th, el))
Run Code Online (Sandbox Code Playgroud)

处理错误如下:

Mono<Void> handleError(Throwable th, Object element) {
    LOG.error("An error occurred on {}", element, th);
    return Mono.empty()
}
Run Code Online (Sandbox Code Playgroud)

或者,如果您想做一些需要异步的更复杂的事情:

Mono<Void> handleError(Throwable th, Object element) {
    return doSomethingThaReturnFluxOrMono(element).then();
}
Run Code Online (Sandbox Code Playgroud)