我只有一个数据项源,我想与多个下游流共享该Flux。
它与参考指南中的示例非常相似,但是我觉得该示例通过.connect()手动调用而作弊。具体来说,我不知道将有多少个下游订户,并且我没有控制权来调用.connect()“最后”。消费者应该能够订阅,但不能立即触发数据提取。然后在将来实际需要数据的某个地方,它们将在必要时提取。
此外,源对消耗很敏感,因此无法重新获取。
除此之外,它会很大,因此不能选择缓冲和重播。
理想地,最重要的是,整个事情发生在一个线程中,因此没有并发或等待。
(不希望为订阅者提供非常小的等待时间)
对于Monos,我几乎可以达到预期的效果(单个最终结果值):
public class CoConsumptionTest {
@Test
public void convenientCoConsumption() {
// List used just for the example:
List<Tuple2<String, String>> source = Arrays.asList(
Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
);
// Source which is sensitive to consumption
AtomicInteger consumedCount = new AtomicInteger(0);
Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();
@Override …Run Code Online (Sandbox Code Playgroud)