yas*_*eco 2 java reactive-programming project-reactor reactive-streams
我有一个会发出一些Date. 这Date映射到我在某些Executer.
我想做的是等待所有 1024 个 HTTP 请求,然后再发出下一个Date.
目前运行时,onNext()被调用多次,然后稳定在某个稳定的速率。
我怎样才能改变这种行为?
PS 如果需要的话,我愿意改变架构。
private void run() throws Exception {
Executor executor = Executors.newFixedThreadPool(2);
Flux<Date> source = Flux.generate(emitter ->
emitter.next(new Date())
);
source
.log()
.limitRate(1)
.doOnNext(date -> System.out.println("on next: " + date))
.map(date -> Flux.range(0, 1024))
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)))
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
Run Code Online (Sandbox Code Playgroud)
HTTP请求模拟:
private static String simulateHttp() {
try {
System.out.println("start http call");
Thread.sleep(3_000);
} catch (Exception e) {}
return "HTML content";
}
Run Code Online (Sandbox Code Playgroud)
编辑:改编自答案的代码:
flatMap需要另一个错误)其次,我给两者都添加了concurrency参数(似乎两者都需要)1flatMap
Executor executor = Executors.newSingleThreadExecutor();
Flux<Date> source = Flux.generate(emitter -> {
System.out.println("emitter called!");
emitter.next(new Date());
});
source
.limitRate(1)
.map(date -> Flux.range(0, 16))
.flatMap(Function.identity(), 1) # concurrency = 1
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
Run Code Online (Sandbox Code Playgroud)您应该看看这些方法:
concatMap确保在算子内按顺序处理通量上的元素:
内部函数的生成和订阅:该运算符等待一个内部函数完成,然后再生成下一个内部函数并订阅它。
flatMapconcurrency允许您通过公开和参数来执行相同的操作prefetch,这使您可以更好地控制此行为:
并发参数允许控制可以并行订阅和合并的发布者数量。反过来,该参数显示向上游发送的第一个 Subscription.request(long) 的大小。prefetch 参数允许为合并的 Publisher 提供任意的预取大小(换句话说,预取大小意味着合并的 Publisher 的第一个 Subscription.request(long) 的大小)。
| 归档时间: |
|
| 查看次数: |
3475 次 |
| 最近记录: |