反应堆.单声道列表,重试失败

Ser*_*hko 0 reactor project-reactor

我有清单List<Mono<String>>.每个Mono代表API调用,我在I/O上等待结果.问题是有些调用没有返回任何内容(空字符串),我需要在这种情况下再次重复它们.

现在它看起来像这样:

val firstAskForItemsRetrieved = firstAskForItems.map {
    it["statistic"] = (it["statistic"] as Mono<Map<Any, Any>>).block()
    it
}
Run Code Online (Sandbox Code Playgroud)

我正在等待所有Monos完成,然后在空身的情况下我重复请求

val secondAskForItem = firstAskForItemsRetrieved
        .map {
            if ((it["statistic"] as Map<Any, Any>).isEmpty()) {
                // repeat request 
                it["statistic"] = getUserItem(userName) // return Mono
            } else
                it["statistic"] = Mono.just(it["statistic"])
            it
        }
Run Code Online (Sandbox Code Playgroud)

然后再次阻止每个项目

val secondAskForItemsRetrieved = secondAskForItems.map {
    it["statistic"] = (it["statistic"] as Mono<Map<Any, Any>>).block()
    it
}
Run Code Online (Sandbox Code Playgroud)

我觉得这看起来很难看

  • 有没有其他方法可以在Mono中重试呼叫,如果它失败了,而不是手动操作?

  • 它是否阻止每个项目以正确的方式获取它们?

  • 如何使代码更好?

谢谢.

Sim*_*slé 5

我相信有2个运营商可以帮助您:

  • 对于"等待所有Mono"用例,请看一下静态方法whenzip.

    • when只关心完成,所以即使单声道是空的,onComplete只要所有的单声道都完成,它就会发出信号.但是,您无法获取数据.
    • zip关心价值观并期望所有Monos都受到重视.当所有Monos都被重视时,它会根据传递的值组合它们的值Function.否则它只是完成空.
  • 要重试空Monos,请看一下repeatWhenEmpty.它重新订阅为空Mono,所以如果它Mono是"冷",它将重新启动源(例如,发出另一个HTTP请求).