Spring Flux 和 Async 注释

Jon*_*uin 7 java spring project-reactor spring-async spring-webflux

我有一个 Spring Flux 应用程序,在某些时候我需要在后台执行一些繁重的任务,调用者(HTTP 请求)不需要等到该任务完成。

如果没有反应器,我可能只会使用Async注释,在不同的线程上执行该方法。对于reactor,我不确定是否应该继续采用这种方法,或者是否已经有内置机制可以实现这一点。

例如,给定一个接受Resource对象的Controller

@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
    processor.run(r); // the caller should not wait for the resource to be processed
    return repository.save(r);
}
Run Code Online (Sandbox Code Playgroud)

和一个处理器类:

@Async
void run(Resource r) { 
    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> result = webClient.get()
                                   .retrieve()
                                   .bodyToMono(String.class);
    String response = result.block(); //block for now
}
Run Code Online (Sandbox Code Playgroud)

HTTP 调用方/create不需要等待run方法完成。

Fri*_*ing 5

我做了一些测试,我认为即使使用subscribe()“一劳永逸”也会等待请求完成,然后再将答案返回到网络浏览器或 REST 客户端(至少在我的简单测试中,看起来是这样)。因此,您必须执行与@Async类似的操作,创建另一个线程:

@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
    return processor.run(r)
    .subscribeOn(Schedulers.elastic()) // put eveything above this line on another thread
    .doOnNext(string -> repository.save(r)); // persist "r", not changing it, though

}
Run Code Online (Sandbox Code Playgroud)

还有一个处理器类:

Mono<String> run(Resource r) { 
    WebClient webClient = WebClient.create("http://localhost:8080");
    return webClient.get()
           .retrieve()
           .bodyToMono(String.class);
}
Run Code Online (Sandbox Code Playgroud)

  • 这不会让调用者等待“processor#run”方法执行并完成吗? (3认同)

Ale*_*kin 5

如果您正在寻找即发即弃模式的实现,您可以订阅您的发布者

@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
    run(r).subscribe();
    return repository.save(r);
}

Mono<Void> run(Resource r) {
    WebClient webClient = WebClient.create("http://localhost:8080");
    return webClient.get()
            .retrieve()
            .bodyToMono(String.class)
            .then();
}
Run Code Online (Sandbox Code Playgroud)

如果您的发布者执行阻塞操作,则应在具有弹性或并行调度程序的其他线程上订阅它。