如何避免在执行长时间运行计算的 Spring WebFlux 控制器中使用 Kotlin 协程的 GlobalScope

Qel*_*lix 3 spring kotlin spring-webflux kotlin-coroutines

我有一个使用 Spring WebFlux 和 Kotlin 实现的 Rest API,其端点用于启动长时间运行的计算。由于让调用者等待计算完成并不是很优雅,因此它应该立即返回一个 ID,调用者可以使用该 ID 在不同端点可用时获取结果。计算在后台启动,并且应该在准备好时完成 - 我并不真正关心它何时完成,因为轮询它是调用者的工作。

当我使用 Kotlin 时,我认为解决这个问题的规范方法是使用协程。下面是我的实现的一个最小示例(使用Spring 的 Kotlin DSL而不是传统控制器):

import org.springframework.web.reactive.function.server.coRouter

// ...

fun route() = coRouter {
    POST("/big-computation") { request: ServerRequest ->
        val params = request.awaitBody<LongRunningComputationParams>()
        val runId = GlobalResultStorage.prepareRun(params);
        coroutineScope {
            launch(Dispatchers.Default) {
                GlobalResultStorage.addResult(runId, longRunningComputation(params))
            }
        }
        ok().bodyValueAndAwait(runId)
    }
}
Run Code Online (Sandbox Code Playgroud)

但这并没有达到我想要的效果,因为外部协程(之后的块POST("/big-computation"))会等待其内部协程完成执行,因此只有runId在不再需要它时才返回。

我能找到的唯一可能的方法是使用GlobalScope.launch,它会生成一个没有父级等待其结果的协程,但我到处都读到强烈建议您不要使用它。需要明确的是,有效的代码如下所示:

POST("/big-computation") { request: ServerRequest ->
    val params = request.awaitBody<LongRunningComputationParams>()
    val runId = GlobalResultStorage.prepareRun(params);
    GlobalScope.launch {
        GlobalResultStorage.addResult(runId, longRunningComputation(params))
    }
    ok().bodyValueAndAwait(runId)
}
Run Code Online (Sandbox Code Playgroud)

我是否遗漏了一些非常明显的东西,这些东西将使我的示例使用适当的结构化并发工作,或者这真的是一个合法的用例吗GlobalScope?是否有一种方法可以在不附加到其启动范围的范围内启动长时间运行计算的协程?我能想到的唯一想法是从同一个协程作用域启动计算和请求处理程序,但由于计算取决于请求处理程序,我不知道这是如何可能的。

预先非常感谢!

bro*_*oot 6

也许其他人不会同意我的观点,但我认为这种厌恶GlobalScope有点夸张。我经常有这样的印象:有些人并不真正理解问题所在GlobalScope,他们用具有相似缺点或实际上相同的解决方案来代替它。但好吧,至少他们不再使用邪恶GlobalScope了......

别误会我的意思:GlobalScope很糟糕。特别是因为它太容易使用,所以很容易过度使用它。但很多情况下我们并不真正关心它的缺点。

结构化并发的主要目标是:

  • 自动等待子任务,这样我们就不会在子任务完成之前意外地继续执行。
  • 取消个人工作。
  • 取消/关闭安排后台任务的服务/组件。
  • 异步任务之间的故障传播。

这些功能对于提供可靠的并发应用程序至关重要,但令人惊讶的是,在很多情况下它们都无关紧要。让我们举个例子:如果您的请求处理程序在应用程序的整个时间内都在工作,那么您就不需要等待子任务和关闭功能。您不想传播失败。取消单个子任务在这里并不适用,因为无论我们是否使用GlobalScope或“正确”的解决方案,我们都会通过将任务存储在某处来执行完全相同的操作Job

因此,我想说,气馁的主要原因GlobalScope并不适用于您的情况。

话虽如此,我仍然认为实施通常建议作为GlobalScope. 只需使用您自己的属性创建一个属性CoroutineScope并使用它来启动协程:

private val scope = CoroutineScope(Dispatchers.Default)

fun route() = coRouter {
    POST("/big-computation") { request: ServerRequest ->
        ...
        scope.launch {
            GlobalResultStorage.addResult(runId, longRunningComputation(params))
        }
        ...
    }
}
Run Code Online (Sandbox Code Playgroud)

你不会从中得到太多。它不会帮助您解决资源泄漏问题,也不会让您的代码更可靠等等。但至少它将有助于以某种方式对后台任务进行分类。从技术上讲,可以确定谁是后台任务的所有者。您可以轻松地在一个地方配置所有后台任务,例如提供CoroutineName或切换到另一个线程池。您可以计算当前有多少个活动子任务。如果您需要的话,添加正常关闭会更容易。等等。

但最重要的是:实施成本低廉。你不会得到太多,但也不会花费你太多,所以为什么不呢。