如何在 Project Reactor 中实现轮询逻辑?

Cow*_*Zow 5 spring project-reactor

我有一个方法发送请求以获取作业状态并返回状态,如下所示:

Mono<JobStatus> getJobStatus() {...}

JobStatus对象有一个方法JobStatus.isDone()可以返回挂起的作业是否完成。

有没有办法让我继续订阅单声道直到它成为JobStatus.isDone()现实?即类似的东西getJobStatus().keepSubscribingUntil(status -> status.isDone())

Sim*_*slé 11

一种选择是getJobStatus() Mono仅在工作完成时发出,这可能并不容易,具体取决于Mono当前的实现方式。

对于轮询,假设Mono每次订阅时都会进行轮询,您可以repeatWhen与以下命令配对使用takeUntil

getJobStatus()
    .repeatWhen(completed -> completed.delayElements(Duration.ofMillis(pollDelay))) //(1)
    .takeUntil(JobStatus::isDone) //(2)
    .last() //(3)
Run Code Online (Sandbox Code Playgroud)

(1) 重复重新订阅源Mono(这会产生Flux<JobStatus>

(2) 一旦返回状态标记为done,就取消上述重复循环

(3) 切换回Mono<JobStatus>发出最后一次迭代状态的 a (因此标记为完成的那个)

  • 嘿,关于 `.repeatWhen(...)` 行,应该是: `.repeatWhen(completed -&gt; Completed.delayElements(Duration.ofMillis(pollDelay))` 吗?即,repeatWhen 内的通量必须发出许多元素,而不是只有一个 Mono.delay 只会发出一次? (2认同)