Java Stream 与 Flux fromIterable

Sar*_*oev 5 java spring java-stream spring-webflux spring-webclient

我有一个用户名列表,想要从远程服务获取用户详细信息而不阻塞主线程。我正在使用 Spring 的反应式客户端 WebClient。对于响应,我获取 Mono,然后订阅它并打印结果。

private Mono<User> getUser(String username) {
    return webClient
            .get()
            .uri(uri + "/users/" + username)
            .retrieve()
            .bodyToMono(User.class)
            .doOnError(e -> 
                logger.error("Error on retrieveing a user details {}", username));
}
Run Code Online (Sandbox Code Playgroud)

我通过两种方式实现了这个任务:

使用Java stream

usernameList.stream()
          .map(this::getUser)
          .forEach(mono ->
                mono.subscribe(System.out::println));
Run Code Online (Sandbox Code Playgroud)

使用Flux.fromIterable

Flux.fromIterable(usernameList)
        .map(this::getUser)
        .subscribe(mono ->
                mono.subscribe(System.out::println));

Run Code Online (Sandbox Code Playgroud)

看来主线程并没有以两种方式被阻塞。Java Stream在这种情况下和有什么区别Flux.fromIterable?如果两者都在做同样的事情,建议使用哪一个?

Bri*_*zel 3

两种变体之间没有太大差异。该Flux.fromIterable变体可能会为您提供更多关于并发/重试等的选项和控制 - 但在这种情况下并非如此,因为subscribe在这里调用违背了目的。

您的问题缺少一些有关您正在构建的应用程序类型以及在何种上下文中进行这些调用的背景信息。如果您正在构建一个 Web 应用程序,并且在请求处理期间调用该应用程序,或者是一个批处理应用程序,那么意见可能会有所不同。

一般来说,我认为应用程序应该远离调用,subscribe因为它会将该管道的处理与应用程序的其余部分断开:如果发生异常,您可能无法报告它,因为用于发送该错误消息的资源可能会那时就消失了。或者应用程序可能正在关闭,而您无法让它等待该任务完成。

如果您正在构建一个想要开始某些工作的应用程序,并且其结果对当前操作没有用处(即该工作在当前操作的生命周期内是否完成并不重要),那么可能subscribe是一个选项。

在这种情况下,我会尝试将所有操作分组到一个Mono<Void>操作中,然后触发该工作:

Mono<Void> logUsers = Flux.fromIterable(userNameList)
    .map(name -> getUser(name))
    .doOnNext(user -> System.out.println(user)) // assuming this is non I/O work
    .then();
logUsers.subscribe(...);
Run Code Online (Sandbox Code Playgroud)

如果您担心在 Web 应用程序中消耗服务器线程,那么情况确实不同 - 您可能希望获取该操作的结果以向 HTTP 响应写入某些内容。通过调用 subscribe,两个任务现在都已断开连接,并且在工作完成时 HTTP 响应可能早已消失(并且在写入响应时您会收到错误)。

在这种情况下,您应该将操作与 Reactor 运算符链接起来。