Reactor中有没有办法忽略错误信号?

Sta*_*lfi 3 java project-reactor

我有多个URL和端口的数组。对于他们每个人,我需要发送和接收一些东西:

Flux.fromArray(trackersArray)
    .flatMap(tracker -> 
               ConnectToTracker.connect(tracker.getTracker(), tracker.getPort()))
Run Code Online (Sandbox Code Playgroud)

我与服务器进行通信,UDP因此除非我发送了一条消息,该消息“根据某些规则需要响应”,否则我无法确定服务器是否处于活动状态。

ConnectToTracker.connectonNext如果服务器响应,则可以发送信号onError;例如,如果服务器不响应(SocketTimeOutException)或任何其他故障(常规IOException),则可以发送信号。

我不想终止flux例如,如果onError信号等于SocketTimeOutException相反,我想尝试与我得到的每个跟踪器进行通信。

该链接包含我可以用来处理错误但不能忽略它们的所有操作

Reactor 3如果这很重要,我正在使用。

更新:

我做了一个丑陋的把戏,但是有效:

Flux.fromArray(trackersArray)
        .handle((Tracker tracker, SynchronousSink<ConnectResponse> sink) -> {
            ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
                    .subscribe(sink::next, error -> {
                        if (!(error instanceof SocketTimeoutException))
                            sink.error(error);
                    }, sink::complete);
        })
Run Code Online (Sandbox Code Playgroud)

如果您有更好的选择,请填写。

Sim*_*slé 8

由于您已经在平面图中处理网址,请使用onErrorResume(e -> Mono.empty())。这将使Flatmap忽略该错误。编辑:在平面图中,在lambda的右侧

  • 如果源已经被完全处理,那么它就会完成,这就是 Flux 的全部精神。如果 6 个元素中的第 4 个错误,则上述平面图中的解决方案将跳过该元素并继续处理元素 5 (2认同)

Wes*_*Gun 6

现在我们有了reactor.core.publisher.onErrorContinue()in 版本3.3.2,它允许您onNext()在某些元素为 时发送信号onError()。使用log()你会看得更清楚。

签名是(throwable, instance)这样,如果你想记录出错的一个,很有用。

Flux.fromIterable(aList)
    .flatMap(this::xxxx)
    .onErrorContinue((throwable, o) -> {
        log.error("Error while processing {}. Cause: {}", o, throwable.getMessage());
})
    ....
Run Code Online (Sandbox Code Playgroud)

  • @Konrad,通量的作用是忽略异常。因此,如果您需要测试通量,请在“this.xxx()”内抛出异常并检查“StepVerifier”是否成功完成。 (2认同)

小智 5

Flux.fromArray(trackersArray)
.flatMap(tracker -> 
           ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
                .onErrorResume(SocketTimeoutException.class, __ -> Mono.empty()))
Run Code Online (Sandbox Code Playgroud)

也许这样做更好,它会从 SocketTimeOut 恢复,如果异常是其他的,我会选择 onError