shr*_*ing 20 java project-reactor
我已经找到了很多关于RxJava的答案,但我想了解它在Reactor中是如何工作的.
我目前的理解非常模糊,我倾向于认为map是同步的,而flatMap是异步的,但我不能真正理解它.
这是一个例子:
files.flatMap { it ->
Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
.map {destFile ->
destFile.createNewFile()
destFile
}
.flatMap(it::transferTo)
}.then()
Run Code Online (Sandbox Code Playgroud)
我有文件(a Flux<FilePart>),我想将它复制到UPLOAD_ROOT服务器上的一些.
这个例子来自一本书.
我可以改变一切.map,以.flatMap反之亦然,一切仍然有效.我想知道区别是什么.
Sim*_*slé 32
map 用于同步,非阻塞,1对1转换flatMap 用于异步(非阻塞)1到N转换差异在方法签名中可见:
map拿a Function<T, U>并返回aFlux<U>flatMap拿a Function<T, Publisher<V>>并返回aFlux<V>这是主要的提示:你可以传递Function<T, Publisher<V>>给a map,但它不知道如何处理Publishers,这将导致Flux<Publisher<V>>一系列惰性发布者.
另一方面,flatMap期望Publisher<V>每个人T.它知道如何处理它:订阅它并在输出序列中传播它的元素.其结果,返回类型为Flux<V>:flatMap将平坦化每个内Publisher<V>进入的输出序列的所有的V第
关于1-N方面:
对于每个<T>输入元素,flatMap将其映射到a Publisher<V>.在某些情况下(例如,HTTP请求),该发布者将只发出一个项目,在这种情况下,我们非常接近异步map.
但那是堕落的情况.一般情况是a Publisher可以发出多个元素,并且也flatMap可以正常工作.
举个例子,想象一下你有一个被动数据库,你可以从一系列用户ID中获得flatMap,并返回一个用户集的请求Badge.您最终获得了Flux<Badge>所有这些用户的所有徽章中的一个.
map真的是同步和非阻塞吗?
是的:它在操作符应用它的方式上是同步的(一个简单的方法调用,然后运算符发出结果)和非阻塞,因为函数本身不应该阻止操作符调用它.换句话说,它不应该引入延迟.那是因为a Flux仍然是异步的整体.如果它阻止中间序列,它将影响其余的Flux处理,甚至其他Flux.
如果您的map函数阻塞/引入延迟但无法转换为返回a Publisher,请考虑publishOn/ subscribeOn来抵消阻塞单独线程上的工作.
Rao*_*ouf 11
flatMap 方法与 map 方法类似,主要区别在于您提供给它的供应商应该返回一个Mono<T>或 Flux<T>。
使用 map 方法会产生一个 ,Mono<Mono<T>>
而使用 flatMap 会产生一个Mono<T>。
例如,当您必须使用返回 Mono 的 java API 进行网络调用来检索数据,然后另一个网络调用需要第一个网络调用的结果时,它非常有用。
// Signature of the HttpClient.get method
Mono<JsonObject> get(String url);
// The two urls to call
String firstUserUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details/"; // needs the id at the end
// Example with map
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl).
map(user -> HttpClient.get(userDetailsUrl + user.getId()));
// This results with a Mono<Mono<...>> because HttpClient.get(...)
// returns a Mono
// Same example with flatMap
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl).
flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
// Now the result has the type we expected
Run Code Online (Sandbox Code Playgroud)
此外,它还允许精确处理错误:
public UserApi {
private HttpClient httpClient;
Mono<User> findUser(String username) {
String queryUrl = "http://my-api-address/users/" + username;
return Mono.fromCallable(() -> httpClient.get(queryUrl)).
flatMap(response -> {
if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found"));
else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException());
else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api"));
return Mono.just(response.data);
});
}
}
Run Code Online (Sandbox Code Playgroud)
创建一个Player类。
@Data\n@AllArgsConstructor\npublic class Player {\n String name;\n String name;\n}\n\nRun Code Online (Sandbox Code Playgroud)\n现在创建一些Player类的实例
Flux<Player> players = Flux.just(\n "Zahid Khan",\n "Arif Khan",\n "Obaid Sheikh")\n .map(fullname -> {\n String[] split = fullname.split("\\\\s");\n return new Player(split[0], split[1]);\n });\n\nStepVerifier.create(players)\n .expectNext(new Player("Zahid", "Khan"))\n .expectNext(new Player("Arif", "Khan"))\n .expectNext(new Player("Obaid", "Sheikh"))\n .verifyComplete();\nRun Code Online (Sandbox Code Playgroud)\n\n\n关于 map(),\xe2\x80\x99 需要了解的重要一点是映射是同步执行的,因为每个项目都是由源 Flux 发布的。如果要异步执行映射,则应考虑 flatMap () 手术。
\n
Flux<Player> players = Flux.just(\n "Zahid Khan", \n "Arif Khan", \n "Obaid Sheikh")\n .flatMap(\n fullname -> \n Mono.just(fullname).map(p -> {\n String[] split = p.split("\\\\s");\n return new Player(split[0], split[1]);\n }).subscribeOn(Scheduler.parallel()));\n\n List<Player> playerList = Arrays.asList(\n new Player("Zahid", "Khan"),\n new Player("Arif", "Khan"), \n new Player("Obaid", "Sheikh"));\n\n StepVerifier.create(players).expectNextMatches(player -> \n playerList.contains(player)) \n .expectNextMatches(player -> \n playerList.contains(player))\n .expectNextMatches(player -> \n playerList.contains(player))\n .expectNextMatches(player -> \n playerList.contains(player))\n .verifyComplete();\n\nRun Code Online (Sandbox Code Playgroud)\n在 Flatmap() 内部,对 Mono 执行 map() 操作以将 String 转换为 Player。此外,subscribeOn() 指示每个订阅应该在并行线程中进行。 如果没有 subscribeOn(),则 flatmap() 充当同步函数。
\n该映射用于同步、非阻塞、一对一转换\n而 flatMap 用于异步(非阻塞)一对多转换。
\n