项目 Reactor:如何控制通量排放

yas*_*eco 2 java reactive-programming project-reactor reactive-streams

我有一个会发出一些Date. 这Date映射到我在某些Executer.

我想做的是等待所有 1024 个 HTTP 请求,然后再发出下一个Date.

目前运行时,onNext()被调用多次,然后稳定在某个稳定的速率。

我怎样才能改变这种行为?

PS 如果需要的话,我愿意改变架构。

private void run() throws Exception {

    Executor executor = Executors.newFixedThreadPool(2);

    Flux<Date> source = Flux.generate(emitter ->
        emitter.next(new Date())
    );

    source
            .log()
            .limitRate(1)
            .doOnNext(date -> System.out.println("on next: " + date))
            .map(date -> Flux.range(0, 1024))
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)))
            .subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}
Run Code Online (Sandbox Code Playgroud)

HTTP请求模拟:

private static String simulateHttp() {
    try {
        System.out.println("start http call");
        Thread.sleep(3_000);
    } catch (Exception e) {}

    return "HTML content";
}
Run Code Online (Sandbox Code Playgroud)

编辑:改编自答案的代码:

  • 首先,我的代码中有一个错误(flatMap需要另一个错误)
  • 其次,我给两者都添加了concurrency参数(似乎两者都需要)1flatMap

    Executor executor = Executors.newSingleThreadExecutor();
    
    Flux<Date> source = Flux.generate(emitter -> {
        System.out.println("emitter called!");
        emitter.next(new Date());
    });
    
    source
            .limitRate(1)
            .map(date -> Flux.range(0, 16))
            .flatMap(Function.identity(), 1) # concurrency = 1
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1
            .subscribe(s -> System.out.println(s));
    
    Thread.currentThread().join();
    
    Run Code Online (Sandbox Code Playgroud)

Mar*_*nyi 5

您应该看看这些方法:

concatMap确保在算子内按顺序处理通量上的元素:

内部函数的生成和订阅:该运算符等待一个内部函数完成,然后再生成下一个内部函数并订阅它。

flatMapconcurrency允许您通过公开和参数来执行相同的操作prefetch,这使您可以更好地控制此行为:

并发参数允许控制可以并行订阅和合并的发布者数量。反过来,该参数显示向上游发送的第一个 Subscription.request(long) 的大小。prefetch 参数允许为合并的 Publisher 提供任意的预取大小(换句话说,预取大小意味着合并的 Publisher 的第一个 Subscription.request(long) 的大小)。