如何从阻塞队列创建反应堆 Flux?

min*_*t3r 2 java project-reactor spring-webflux

我正在尝试实现从 BlockingQueue 创建的 reactor Flux 但不确定哪个运算符最适合我的用例?

我正在创建一个流式 REST 端点,其中响应是 Flux,它需要不断地从 BlockingQueue 发出消息作为对 GET REST 调用的响应。

我已经尝试过论坛和文档,并且只能找到从可迭代集合或反应数据源启动的 Flux,但没有来自任何 BlockingQueue 的示例。

bsi*_*eup 10

您可以尝试Flux#generateQueue#peek。请记住,如果队列为空,peek它将返回null,并且不能在onNext.

就像是:

Flux.generate(sink -> {
    val element = queue.peek();
    if (element == null) {
        sink.complete();
    } else {
        sink.next(element);
    }
});
Run Code Online (Sandbox Code Playgroud)

还有Flux#repeatWhen运算符,以防您想在队列被视为空后重新订阅队列,例如:

flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
Run Code Online (Sandbox Code Playgroud)

  • @bsideup,为什么这里使用 `peek()` 而不是 `poll()` 在管道化后从队列中删除元素? (5认同)
  • 有效。尽管如此,一旦服务器发出接收完成信号,我的客户端(浏览器)就会不断地一次又一次地发送请求。所以我改为BlockingQueue#take。不确定这如何适合反应式异步处理。 (2认同)
  • 请小心,因为“#take”会阻塞。确保您订阅了阻塞友好的调度程序(例如“Schedulers.elastic()”) (2认同)