Sta*_*lfi 3 java project-reactor
我有多个URL和端口的数组。对于他们每个人,我需要发送和接收一些东西:
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort()))
Run Code Online (Sandbox Code Playgroud)
我与服务器进行通信,UDP因此除非我发送了一条消息,该消息“根据某些规则需要响应”,否则我无法确定服务器是否处于活动状态。
ConnectToTracker.connectonNext如果服务器响应,则可以发送信号onError;例如,如果服务器不响应(SocketTimeOutException)或任何其他故障(常规IOException),则可以发送信号。
我不想终止flux例如,如果onError信号等于SocketTimeOutException。相反,我想尝试与我得到的每个跟踪器进行通信。
该链接包含我可以用来处理错误但不能忽略它们的所有操作。
Reactor 3如果这很重要,我正在使用。
更新:
我做了一个丑陋的把戏,但是有效:
Flux.fromArray(trackersArray)
.handle((Tracker tracker, SynchronousSink<ConnectResponse> sink) -> {
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.subscribe(sink::next, error -> {
if (!(error instanceof SocketTimeoutException))
sink.error(error);
}, sink::complete);
})
Run Code Online (Sandbox Code Playgroud)
如果您有更好的选择,请填写。
由于您已经在平面图中处理网址,请使用onErrorResume(e -> Mono.empty())。这将使Flatmap忽略该错误。编辑:在平面图中,在lambda的右侧
现在我们有了reactor.core.publisher.onErrorContinue()in 版本3.3.2,它允许您onNext()在某些元素为 时发送信号onError()。使用log()你会看得更清楚。
签名是(throwable, instance)这样,如果你想记录出错的一个,很有用。
Flux.fromIterable(aList)
.flatMap(this::xxxx)
.onErrorContinue((throwable, o) -> {
log.error("Error while processing {}. Cause: {}", o, throwable.getMessage());
})
....
Run Code Online (Sandbox Code Playgroud)
小智 5
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.onErrorResume(SocketTimeoutException.class, __ -> Mono.empty()))
Run Code Online (Sandbox Code Playgroud)
也许这样做更好,它会从 SocketTimeOut 恢复,如果异常是其他的,我会选择 onError
| 归档时间: |
|
| 查看次数: |
3003 次 |
| 最近记录: |