使用 Spring Webflux 时在后台运行 Mono 并返回响应

Joh*_*han 6 spring-boot project-reactor spring-webflux

这个问题与Spring Web Flux 中的立即返回有关,但我认为不一样(至少那里的答案对我来说并不令人满意)。

我有一个函数返回一个Mono,当被调用时会启动一个长时间运行的工作。当调用 Spring Webflux HTTP API 时会调用此函数。下面是一个例子:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
    val longRunningJob : Mono<Job> = startNewJob(jobId)
    longRunningJob.map { job ->
        val jobUri = generateJobUri(request, job.id)
        ResponseEntity.created(jobURI).build<Unit>()
    }
}
Run Code Online (Sandbox Code Playgroud)

上面代码的问题是“201 Created”是在长时间运行的作业完成创建的。我想longRunningJob在后台启动并立即返回“201 Created”。

我也许可以做这样的事情:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {

    startNewJob(jobId)
        .subscribeOn(Schedulers.newSingle("thread"))
        .subscribe()

    val jobUri = generateJobUri(request, job.id)
    val response = ResponseEntity.created(jobURI).build<Unit>()
    Mono.just(response)
}
Run Code Online (Sandbox Code Playgroud)

但对我来说,必须subscribe()手动调用似乎不太习惯(例如,intellij 抱怨我subscribe()在非阻塞范围内调用)。没有更好的方法来组合两个“流”而不使用显式subscribe?如果是这样,我如何修改startNewJob上面的功能来实现这一点?

Phi*_*lay 7

AFAIK,使用其中一种subscribe方法是真正在后台启动具有自己生命周期的作业(不与返回的发布者绑定)的唯一方法。

如果您要使用其中一个运算符来组合作业发布者和响应发布者(例如zipmerge),则作业发布者的生命周期将与响应发布者绑定,这不是您想要的后台作业。

您可能需要考虑的一件事是在响应发布者流中启动后台作业,而不是直接在方法主体中启动。例如,通过doOnSubscibe或来自响应上游的操作员。

这会将后台作业的启动与响应发布者的 onSubscribe 事件联系起来,但仍允许其在后台完成。

另请注意,如果您希望能够取消后台作业(例如,可能在应用程序关闭期间),则需要保存返回的值,Disposable以便subscribe稍后调用dispose它。通过某种类型的BackgroundJobManager 可以更好地完成此操作,它可以跟踪所有正在运行的作业。