在计划任务中使用 Flux

dil*_*ius 4 project-reactor spring-webflux

我正在处理一个 Spring Webflux 项目,并且在尝试在计划任务中发布和使用 Flux 时遇到了问题。

@Scheduled(fixedRate = 20*1000)
fun updateNews() {
    try {
        logger.info("Automatic Update at: ${LocalDateTime.now()}")
        articleRepository.saveAll(
                sourceRepository.findAll().publishOn(Schedulers.parallel())
                        .map { source -> source.generate() }
                        .flatMap { it?.read() ?: Flux.empty() })
                        .timeout(Duration.ofSeconds(20)
        ).subscribeOn(Schedulers.parallel())
    } catch(e: Throwable) {
        logger.log(Level.SEVERE, "Error in Scheduler", e)
    }
}
Run Code Online (Sandbox Code Playgroud)

我配置的调度程序:

ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))
Run Code Online (Sandbox Code Playgroud)

除非我故意阻止,否则此任务永远不会完成:

.then().block()
Run Code Online (Sandbox Code Playgroud)

我最初没有打扰发布/订阅调度程序的直接引用,我已经尝试了所有看似合理的选项,但没有任何效果。

我的日志事件发生了,但似乎当来自调度程序的此任务的线程死亡时,通量也是垃圾;即使一旦我指定了 publishOn 和 subscribeOn 行为,它们就应该在自己的线程池中?

我想让这个动作完全被动,任何建议将不胜感激。

Sim*_*slé 6

@Scheduled未与 集成Flux,因此Flux如果您归还它,它将不知道如何处理。结合这一事实,在 Reactor(以及一般的 Reactive Streams)中,在您之前通常不会发生任何事情subscribe(),并且您可以开始查看出了什么问题。

block()实际上是 的一种形式subscribe(),这就是为什么一旦将其添加到代码中它就可以工作的原因。它实际上可能是这里的最佳选择,因为您需要将一段反应式代码(从ReactiveRepository)桥接到命令式阻塞世界(您的@Scheduled fun)。