如何将 Mono 变成真正的异步(非反应式!)方法调用?

Hon*_*dek 5 java asynchronous project-reactor spring-webflux

我有一个方法

@Service
public class MyService {
    public Mono<Integer> processData() {
        ... // very long reactive operation
    }
}
Run Code Online (Sandbox Code Playgroud)

在正常的程序流程中,我通过 Kafka 事件异步调用此方法。

出于测试目的,我需要将该方法公开为 Web 服务,但该方法应公开为异步:仅返回 HTTP 代码202 ACCEPTED并继续在后台进行数据处理。

Mono#subscribe()仅仅调用控制器方法并返回可以吗(=它不会有任何不需要的副作用)吗?

@Service
public class MyService {
    public Mono<Integer> processData() {
        ... // very long reactive operation
    }
}
Run Code Online (Sandbox Code Playgroud)

还是这样做更好(这里我对 IntelliJ 的警告感到困惑,也许与https://youtrack.jetbrains.com/issue/IDEA-276018相同?):

@RestController
@RequiredArgsConstructor
public class MyController {
    private final MyService service;

    @GetMapping
    public void processData() {
        service.processData()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    }
}
Run Code Online (Sandbox Code Playgroud)

或者其他解决方案?

Mic*_*rry 6

仅仅调用 Mono#subscribe() 并从控制器方法返回可以吗?

有副作用,但您可能可以忍受它们:

  • 这确实是一劳永逸——这意味着虽然你永远不会收到关于成功的通知(大多数人都意识到),但你也永远不会收到关于失败的通知(很少有人意识到)。
  • 如果该进程由于某种原因挂起,该发布者将永远不会完成,并且您将无法知道。由于您订阅的是有界弹性线程池,因此它也会无限期地占用这些有限线程之一。

您可能会同意第一点,或者您可能希望在反应链中进一步记录一些错误日志,作为某种副作用,这样如果出现问题,您至少会收到内部通知。

对于第二点 - 我建议在您的方法调用上设置一个(慷慨的)超时,这样如果它没有在设定的时间内完成,它至少会被取消,并且不再消耗资源。如果您正在运行异步任务,那么这不是一个大问题,因为它只会消耗一点内存。如果您在弹性调度程序上包装阻塞调用,那么情况会更糟,因为您无限期地占用该线程池中的线程。

我还想问为什么你需要在这里使用有界弹性调度程序 - 它用于包装阻塞调用,这似乎不是这个用例的基础。(需要明确的是,如果您的服务阻塞,那么您绝对应该将其包装在弹性调度程序上 - 但如果不是,则没有理由这样做。)

最后,这个例子:

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
    return Mono.empty();
}
Run Code Online (Sandbox Code Playgroud)

...这是一个不该做的精彩例子,因为您正在创建一种“冒名顶替者反应方法” - 有人可能非常合理地订阅返回的发布者,认为它会在底层发布者完成时完成,这显然不是'这里发生了什么。在这种情况下,使用void返回类型并因此不返回任何内容是正确的做法。