min*_*t3r 2 java project-reactor spring-webflux
我正在尝试实现从 BlockingQueue 创建的 reactor Flux 但不确定哪个运算符最适合我的用例?
我正在创建一个流式 REST 端点,其中响应是 Flux,它需要不断地从 BlockingQueue 发出消息作为对 GET REST 调用的响应。
我已经尝试过论坛和文档,并且只能找到从可迭代集合或反应数据源启动的 Flux,但没有来自任何 BlockingQueue 的示例。
bsi*_*eup 10
您可以尝试Flux#generate和Queue#peek。请记住,如果队列为空,peek它将返回null,并且不能在onNext.
就像是:
Flux.generate(sink -> {
val element = queue.peek();
if (element == null) {
sink.complete();
} else {
sink.next(element);
}
});
Run Code Online (Sandbox Code Playgroud)
还有Flux#repeatWhen运算符,以防您想在队列被视为空后重新订阅队列,例如:
flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3241 次 |
| 最近记录: |