Project Reactor:ConnectableFlux按需自动连接

Anl*_*nly 6 java reactive-programming project-reactor reactive-streams

我只有一个数据项源,我想与多个下游流共享该Flux。

它与参考指南中的示例非常相似,但是我觉得该示例通过.connect()手动调用而作弊。具体来说,我不知道将有多少个下游订户,并且我没有控制权来调用.connect()“最后”。消费者应该能够订阅,但不能立即触发数据提取。然后在将来实际需要数据的某个地方,它们将在必要时提取。

此外,源对消耗很敏感,因此无法重新获取。
除此之外,它会很大,因此不能选择缓冲和重播。

理想地,最重要的是,整个事情发生在一个线程中,因此没有并发或等待。
(不希望为订阅者提供非常小的等待时间)

对于Monos,我几乎可以达到预期的效果(单个最终结果值):

public class CoConsumptionTest {
    @Test
    public void convenientCoConsumption() {
        // List used just for the example:
        List<Tuple2<String, String>> source = Arrays.asList(
                Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
                Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
                Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
        );

        // Source which is sensitive to consumption
        AtomicInteger consumedCount = new AtomicInteger(0);
        Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
            private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();

            @Override
            public boolean hasNext() {
                return sourceIterator.hasNext();
            }

            @Override
            public Tuple2<String, String> next() {
                Tuple2<String, String> e = sourceIterator.next();
                consumedCount.incrementAndGet();
                System.out.println("Audit: " + e);
                return e;
            }
        };

        // Logic in the service:
        Flux<Tuple2<String, String>> f = Flux.fromIterable(() -> statefulIterator);
        ConnectableFlux<Tuple2<String, String>> co = f.publish();

        Function<Predicate<Tuple2<String, String>>, Mono<Tuple2<String, String>>> findOne = (highlySelectivePredicate) ->
                co.filter(highlySelectivePredicate)
                        .next() //gives us a Mono
                        .toProcessor() //makes it eagerly subscribe and demand from the upstream, so it wont miss emissions
                        .doOnSubscribe(s -> co.connect()); //when an actual user consumer subscribes

        // Subscribing (outside the service)
        assumeThat(consumedCount).hasValue(0);
        Mono<Tuple2<String, String>> a2 = findOne.apply(select("a", "2"));
        Mono<Tuple2<String, String>> b1 = findOne.apply(select("b", "1"));
        Mono<Tuple2<String, String>> c1 = findOne.apply(select("c", "1"));
        assertThat(consumedCount).hasValue(0);

        // Data is needed
        SoftAssertions softly = new SoftAssertions();

        assertThat(a2.block()).isEqualTo(Tuples.of("a", "2"));
        softly.assertThat(consumedCount).hasValue(4);

        assertThat(b1.block()).isEqualTo(Tuples.of("b", "1"));
        softly.assertThat(consumedCount).hasValue(4);

        assertThat(c1.block()).isEqualTo(Tuples.of("c", "1"));
        softly.assertThat(consumedCount).hasValue(4);

        softly.assertAll();
    }

    private static Predicate<Tuple2<String, String>> select(String t1, String t2) {
        return e -> e.getT1().equals(t1) && e.getT2().equals(t2);
    }
}
Run Code Online (Sandbox Code Playgroud)

问题:我想知道如何实现通量结果,即应用过滤后的多个值,而不仅仅是第一个/下一个。(不过只要求很高尽可能必要)
(试过天真地更换.toProcessor().publish().autoConnect(0)但没成功)

编辑1: 虽然不允许对源进行缓冲,但是作为参数的过滤器应该具有很高的选择性,因此在过滤后进行缓冲是可以的。

编辑2: 过一会儿,我回到发布的示例上,尝试使用更高版本的示例reactor,它实际上可以工作。

io.projectreactor:reactor-bom:Californium-SR8
> io.projectreactor:reactor-core:3.2.9.RELEASE
Run Code Online (Sandbox Code Playgroud)

Mic*_*rry 1

我不喜欢给出“不回答”风格的答案,但我认为至少必须在这里给出您的要求之一从你的问题来看,要求似乎是:

  • 不允许缓冲
  • 不允许删除元素
  • 订阅者数量未知
  • 订阅者可以随时连接
  • 每个订阅者都必须在需要时获得所有可用数据
  • 无需从源重新获取

以一个订阅者从 a 请求数据的情况为例Flux,其中的前几个元素Flux被消耗,然后最终另一个订阅者在未来的任意时间出现,需要相同的数据。有了上述要求,这是不可能的 - 您要么必须再次获取数据,要么将其保存在某个地方,并且您已经排除了这两个选项。

但是,如果您准备稍微放松这些要求,那么有一些潜在的选择:

已知订阅者数量

如果您能以某种方式计算出您最终将获得的订阅者数量,那么您可以在订阅数量达到后autoConnect(n)自动连接到 a 。ConnectableFlux

允许删除元素

如果您可以允许删除元素,那么您只需调用share();原始元素Flux即可使其在第一个订阅上自动连接,然后未来的订阅者将删除以前的元素。

允许订阅者有时间连接

这可能是更有前途的策略之一,因为你说:

没有并发或等待。(为订阅者加入提供非常短的等待时间是不可取的)

您可以将其变成Flux热源,在特定时间段内缓存所有发出的元素。这意味着您可以以一定量的内存为代价(但不缓冲整个流),为订阅者提供一小段等待时间,让他们可以订阅并仍然接收所有数据。

缓冲已知数量的元素

与上面类似,您可以使用该方法的另一种变体cache()来仅缓存已知数量的元素。如果您知道可以安全地将n元素装入内存,但不能再多了,那么这可以为您提供订阅者安全连接的最大可能时间。