reactor与反应堆中的flatMap

shr*_*ing 20 java project-reactor

我已经找到了很多关于RxJava的答案,但我想了解它在Reactor中是如何工作的.

我目前的理解非常模糊,我倾向于认为map是同步的,而flatMap是异步的,但我不能真正理解它.

这是一个例子:

files.flatMap { it ->
    Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
        .map {destFile ->
            destFile.createNewFile()
            destFile    
        }               
        .flatMap(it::transferTo)
}.then()  
Run Code Online (Sandbox Code Playgroud)

我有文件(a Flux<FilePart>),我想将它复制到UPLOAD_ROOT服务器上的一些.

这个例子来自一本书.

我可以改变一切.map,以.flatMap反之亦然,一切仍然有效.我想知道区别是什么.

Sim*_*slé 32

  • map 用于同步,非阻塞,1对1转换
  • flatMap 用于异步(非阻塞)1到N转换

差异在方法签名中可见:

  • map拿a Function<T, U>并返回aFlux<U>
  • flatMap拿a Function<T, Publisher<V>>并返回aFlux<V>

这是主要的提示:你可以传递Function<T, Publisher<V>>给a map,但它不知道如何处理Publishers,这将导致Flux<Publisher<V>>一系列惰性发布者.

另一方面,flatMap期望Publisher<V>每个人T.它知道如何处理它:订阅它并在输出序列中传播它的元素.其结果,返回类型为Flux<V>:flatMap将平坦化每个内Publisher<V>进入的输出序列的所有V

关于1-N方面:

对于每个<T>输入元素,flatMap将其映射到a Publisher<V>.在某些情况下(例如,HTTP请求),该发布者将只发出一个项目,在这种情况下,我们非常接近异步map.

但那是堕落的情况.一般情况是a Publisher可以发出多个元素,并且也flatMap可以正常工作.

举个例子,想象一下你有一个被动数据库,你可以从一系列用户ID中获得flatMap,并返回一个用户集的请求Badge.您最终获得了Flux<Badge>所有这些用户的所有徽章中的一个.

map真的是同步和非阻塞吗?

是的:它在操作符应用它的方式上是同步的(一个简单的方法调用,然后运算符发出结果)和非阻塞,因为函数本身不应该阻止操作符调用它.换句话说,它不应该引入延迟.那是因为a Flux仍然是异步的整体.如果它阻止中间序列,它将影响其余的Flux处理,甚至其他Flux.

如果您的map函数阻塞/引入延迟但无法转换为返回a Publisher,请考虑publishOn/ subscribeOn来抵消阻塞单独线程上的工作.

  • 不,map函数应该是非阻塞的(除非你也使用`publishOn` /`subscribeOn`来抵消单独线程上的工作).也就是说,它是同步执行的,但不应该有延迟.flatMap函数是异步的,实际上操作员在结果可用时将其展平 (2认同)

Rao*_*ouf 11

flatMap 方法与 map 方法类似,主要区别在于您提供给它的供应商应该返回一个Mono<T>Flux<T>

使用 map 方法会产生一个 ,Mono<Mono<T>> 而使用 flatMap 会产生一个Mono<T>

例如,当您必须使用返回 Mono 的 java API 进行网络调用来检索数据,然后另一个网络调用需要第一个网络调用的结果时,它非常有用。

// Signature of the HttpClient.get method
Mono<JsonObject> get(String url);

// The two urls to call
String firstUserUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details/"; // needs the id at the end

// Example with map
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl).
  map(user -> HttpClient.get(userDetailsUrl + user.getId()));
// This results with a Mono<Mono<...>> because HttpClient.get(...)
// returns a Mono

// Same example with flatMap
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl).
  flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
// Now the result has the type we expected

Run Code Online (Sandbox Code Playgroud)

此外,它还允许精确处理错误:

public UserApi {
  
  private HttpClient httpClient;
    
  Mono<User> findUser(String username) {
    String queryUrl = "http://my-api-address/users/" + username;
    
    return Mono.fromCallable(() -> httpClient.get(queryUrl)).
      flatMap(response -> {
        if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found"));
        else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException());
        else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api"));
        return Mono.just(response.data);
      });
  }
                                           
}
Run Code Online (Sandbox Code Playgroud)


Zah*_*han 7

Map 在 Reactor 内部如何工作。

\n

MAP 内部如何运作

\n

创建一个Player类。

\n
@Data\n@AllArgsConstructor\npublic class Player {\n        String name;\n        String name;\n}\n\n
Run Code Online (Sandbox Code Playgroud)\n

现在创建一些Player类的实例

\n
Flux<Player> players = Flux.just(\n        "Zahid Khan",\n        "Arif Khan",\n        "Obaid Sheikh")\n        .map(fullname -> {\n            String[] split = fullname.split("\\\\s");\n            return new Player(split[0], split[1]);\n        });\n\nStepVerifier.create(players)\n          .expectNext(new Player("Zahid", "Khan"))\n          .expectNext(new Player("Arif", "Khan"))\n          .expectNext(new Player("Obaid", "Sheikh"))\n          .verifyComplete();\n
Run Code Online (Sandbox Code Playgroud)\n
\n

关于 map(),\xe2\x80\x99 需要了解的重要一点是映射是同步执行的,因为每个项目都是由源 Flux 发布的。如果要异步执行映射,则应考虑 flatMap () 手术。

\n
\n

FlatMap 内部如何工作。

\n

FlatMap 内部如何工作。

\n
Flux<Player> players = Flux.just(\n      "Zahid Khan", \n      "Arif Khan", \n      "Obaid Sheikh")\n      .flatMap(\n            fullname -> \n                  Mono.just(fullname).map(p -> {\n                        String[] split = p.split("\\\\s");\n                        return new Player(split[0], split[1]);\n        }).subscribeOn(Scheduler.parallel()));\n\n        List<Player> playerList = Arrays.asList(\n                  new Player("Zahid", "Khan"),\n                  new Player("Arif", "Khan"), \n                  new Player("Obaid", "Sheikh"));\n\n        StepVerifier.create(players).expectNextMatches(player ->         \n                playerList.contains(player))    \n                        .expectNextMatches(player ->  \n                                playerList.contains(player))\n                        .expectNextMatches(player -> \n                                playerList.contains(player))\n                        .expectNextMatches(player -> \n                                playerList.contains(player))\n                        .verifyComplete();\n\n
Run Code Online (Sandbox Code Playgroud)\n

在 Flatmap() 内部,对 Mono 执行 map() 操作以将 String 转换为 Player。此外,subscribeOn() 指示每个订阅应该在并行线程中进行。 如果没有 subscribeOn(),则 flatmap() 充当同步函数。

\n

该映射用于同步、非阻塞、一对一转换\n而 flatMap 用于异步(非阻塞)一对多转换。

\n