反应式管道中的后台任务(即发即忘)

ela*_*tic 5 reactive-programming project-reactor spring-webflux

我有一个反应式管道来处理传入的请求。对于每个请求,我需要调用一个与业务相关的函数 ( doSomeRelevantProcessing)。

完成后,我需要通知一些外部服务发生了什么。管道的那部分不应增加总体响应时间。此外,通知此外部系统并不是业务关键:在管道的主要部分完成后给出快速响应比确保通知成功更重要。

据我所知,在后台运行某些内容而不减慢整个过程的唯一方法是直接在管道中订阅,从而实现“一劳永逸”的心态。

除了在 内订阅之外,还有其他好的选择吗flatmap?我有点担心如果通知外部服务花费的时间比原始处理时间长并且同时收到大量请求会发生什么情况。这是否会导致内存耗尽或整个进程阻塞?

fun runPipeline(incoming: Mono<Request>) = incoming
    .flatMap { doSomeRelevantProcessing(it) } // this should not be delayed
    .flatMap { doBackgroundJob(it) } // this can take a moment, but is not super critical

fun doSomeRelevantProcessing(request: Request) = Mono.just(request) // do some processing

fun doBackgroundJob(request: Request) = Mono.deferContextual { ctx: ContextView ->
    val notification = "notification" // build an object from context

    // this uses non-blocking HTTP (i.e. webclient), so it can take a second or so 
    notifyExternalService(notification).subscribeOn(Schedulers.boundedElastic()).subscribe()

    Mono.just(Unit)
}

fun notifyExternalService(notification: String) = Mono.just(Unit) // might take a while

Run Code Online (Sandbox Code Playgroud)

Mic*_*rry 5

我回答这个问题是假设您使用纯粹的反应机制通知外部服务 - 即您没有包装阻塞服务。如果是,那么答案会有所不同,因为您受到有界弹性线程池大小的限制,如果每秒传入数百个请求,则可能很快就会不堪重负。

(假设您使用的是反应机制,那么就不需要.subscribeOn(Schedulers.boundedElastic())像您在示例中给出的那样,因为这不会给您带来任何东西 - 它是为包装遗留阻塞服务而设计的。)

这会导致内存耗尽吗

这只是在非常极端的情况下才有可能,每个单独的请求使用的内存都会很小。这几乎肯定不值得担心,如果您开始在这里看到内存问题,那么您几乎肯定会在其他地方遇到其他问题。

话虽这么说,我可能建议.timeout(Duration.ofSeconds(5))在您的内部订阅方法之前添加或类似的内容,以确保请求在一段时间后被终止,如果它们因任何原因没有工作 - 这将防止它们建立。

...或者[这会导致]整个过程被阻塞吗?

这个更容易 - 简而言之,不,它不能。