有一个像下面这样的异步发布者,Project Reactor 有没有办法等到整个流完成处理?
当然,不必添加一个未知持续时间的睡眠......
@Test
public void groupByPublishOn() throws InterruptedException {
UnicastProcessor<Integer> processor = UnicastProcessor.create();
List<Integer> results = new ArrayList<>();
Flux<Flux<Integer>> groupPublisher = processor.publish(1)
.autoConnect()
.groupBy(i -> i % 2)
.map(group -> group.publishOn(Schedulers.parallel()));
groupPublisher.log()
.subscribe(g -> g.log()
.subscribe(results::add));
List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
input.forEach(processor::onNext);
processor.onComplete();
Thread.sleep(500);
Assert.assertTrue(results.size() == input.size());
}
Run Code Online (Sandbox Code Playgroud)