如何在不为空的情况下重复 Mono

Ram*_*ash 2 project-reactor spring-webflux

我有一个像这样返回的方法!

Mono<Integer> getNumberFromSomewhere();
Run Code Online (Sandbox Code Playgroud)

我需要继续调用它,直到它没有更多的项目可以发出。那就是我需要将其设为Flux<Integer>.

一种选择是添加repeat. 重点是 - 我想在上述方法发出第一个空信号时停止。

有什么办法可以做到这一点吗?我正在寻找一种干净的方法。

Sim*_*slé 6

执行此操作的内置运算符(尽管它旨在用于“更深”的嵌套)是expand

expand当返回Publisher完成为空时自然停止扩展。

您可以将其应用到您的用例中,如下所示:

//this changes each time one subscribes to it
Mono<Integer> monoWithUnderlyingState;

Flux<Integer> repeated = monoWithUnderlyingState
    .expand(i -> monoWithUnderlyingState);
Run Code Online (Sandbox Code Playgroud)