我正在尝试使用 Reactor Flux 逐个处理数字列表,例如 1 到 10,并且有一个 API /double可以简单地将传入的整数加倍(1 -> 2, 4 -> 8.. .),但是这个API有性能问题,总是需要2秒才能响应结果。当使用limitRate(1)我所期望的时,Reactor 会一个接一个地处理请求,如下所示:
2020-01-01 00:00:02 - 2
2020-01-01 00:00:04 - 4
2020-01-01 00:00:06 - 6
2020-01-01 00:00:08 - 8
2020-01-01 00:00:10 - 10
...
Run Code Online (Sandbox Code Playgroud)
但实际上 Reactor 会立即触发所有请求:
2020-01-01 00:00:02 - 6
2020-01-01 00:00:02 - 10
2020-01-01 00:00:02 - 2
2020-01-01 00:00:02 - 4
2020-01-01 00:00:02 - 8
...
Run Code Online (Sandbox Code Playgroud)
这是代码
Flux.range(1, 10).limitRate(1)
.flatMap(i -> webClient.get().uri("http://localhost:10001/double?integer={int}", i).exchange()
.flatMap(resp -> resp.bodyToMono(Integer.class)))
.subscribe(System.out::println);
Thread.sleep(10000);
Run Code Online (Sandbox Code Playgroud)
似乎limitRate没有按我的预期工作,出了什么问题?有没有办法使用 Reactor 来处理一个又一个请求?提前致谢。
.flatMap在这里不起作用,因为它急切地订阅内部流 - 也就是说,它不会等待内部流onComplete在订阅下一个流之前发出 an 。这就是为什么您的所有呼叫都是同时进行的。它在 模式下工作receive->dispatch->receive->dispatch。
Reactor 提供了一个重载版本,flatMap您可以在其中将并发因子指定为.flatMap(innerstream, concurrency)。该因素限制了 flatMap 将订阅的流的数量。如果是 5 个,则 flatMap 最多可以订阅 5 个内部流。一旦达到此限制,它就必须等待内部流发出,onComplete然后才能订阅下一个内部流。
对于您的情况,您可以将其设置为 1 或使用.concatMap()。concatMap()与 flatMap 完全相同concurrency = 1。它基本上可以在该receive->dispatch->wait->receive->dispatch->wait模式下工作。
我不久前写过一篇文章,详细解释了它的flatMap工作原理,因为我认为很多人在使用它时并不了解其内部结构。你可以参考这里的文章