Reactor GroupedFlux - 等待完成

Mir*_*eaG 6 project-reactor

有一个像下面这样的异步发布者,Project Reactor 有没有办法等到整个流完成处理?
当然,不必添加一个未知持续时间的睡眠......

@Test
public void groupByPublishOn() throws InterruptedException {
    UnicastProcessor<Integer> processor = UnicastProcessor.create();

    List<Integer> results = new ArrayList<>();
    Flux<Flux<Integer>> groupPublisher = processor.publish(1)
                                                  .autoConnect()
                                                  .groupBy(i -> i % 2)
                                                  .map(group -> group.publishOn(Schedulers.parallel()));

    groupPublisher.log()
                  .subscribe(g -> g.log()
                                   .subscribe(results::add));

    List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
    input.forEach(processor::onNext);
    processor.onComplete();

    Thread.sleep(500);

    Assert.assertTrue(results.size() == input.size());
}
Run Code Online (Sandbox Code Playgroud)

Sim*_*slé 6

您可以替换这些行:

 groupPublisher.log()
                  .subscribe(g -> g.log()
                                   .subscribe(results::add));
Run Code Online (Sandbox Code Playgroud)

有了这个

groupPublisher.log()
              .flatMap(g -> g.log()
                             .doOnNext(results::add)
              )
              .blockLast();
Run Code Online (Sandbox Code Playgroud)

flatMap 是一种比 subscribe-within-subscribe 更好的模式,它将为您订阅该组。

doOnNext 处理消耗的副作用(向集合添加值),使您无需在订阅中执行该操作。

blockLast() 替换订阅,而不是让您为它阻塞的事件提供处理程序,直到完成(并返回最后发出的项目,但您已经在 doOnNext 中处理过了)。