Spring WebFlux(反应器)。zipWith 时出错 - 由于缺少请求而无法发出滴答声

Ale*_*zel 3 project-reactor

我有一个 Flux,对于每个对象,我应该对第三方 REST 进行 API 调用(大约 1000 次调用)。为了防止每秒出现许多请求,我使用:

    Flux<Calls> callsIntervalFlux=
            Flux.interval(Duration.ofMillis(100))
                    .zipWith(callsFlux, (i, call) -> call);

// and now Calls emits every 10ms, and REST API is not overloaded
Run Code Online (Sandbox Code Playgroud)

问题是,有时应用程序会因异常而失败:

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Caused by: reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Run Code Online (Sandbox Code Playgroud)

有没有我可以添加的逻辑来防止错误,或者只是跳过这个勾号?

Sim*_*slé 5

这意味着结果的消费者没有足够快地消耗数据:在interval固定频率上,它试图发射但没有人听。

我认为需要在 Reactor 上构建某种更高级的基于许可的速率限制器。但与此同时,您可以尝试的另一种简单(简单化?)方法是单独确保每个调用都比前一个调用延迟 10 毫秒:

Flux<Calls> callsIntervalFlux = callsFlux.delayElements(Duration.ofMillis(10));
Run Code Online (Sandbox Code Playgroud)

(此运算符用于替换zipWith(interval)模式)