Anl*_*nly 6 java reactive-programming project-reactor reactive-streams
我只有一个数据项源,我想与多个下游流共享该Flux。
它与参考指南中的示例非常相似,但是我觉得该示例通过.connect()手动调用而作弊。具体来说,我不知道将有多少个下游订户,并且我没有控制权来调用.connect()“最后”。消费者应该能够订阅,但不能立即触发数据提取。然后在将来实际需要数据的某个地方,它们将在必要时提取。
此外,源对消耗很敏感,因此无法重新获取。
除此之外,它会很大,因此不能选择缓冲和重播。
理想地,最重要的是,整个事情发生在一个线程中,因此没有并发或等待。
(不希望为订阅者提供非常小的等待时间)
对于Monos,我几乎可以达到预期的效果(单个最终结果值):
public class CoConsumptionTest {
@Test
public void convenientCoConsumption() {
// List used just for the example:
List<Tuple2<String, String>> source = Arrays.asList(
Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
);
// Source which is sensitive to consumption
AtomicInteger consumedCount = new AtomicInteger(0);
Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();
@Override
public boolean hasNext() {
return sourceIterator.hasNext();
}
@Override
public Tuple2<String, String> next() {
Tuple2<String, String> e = sourceIterator.next();
consumedCount.incrementAndGet();
System.out.println("Audit: " + e);
return e;
}
};
// Logic in the service:
Flux<Tuple2<String, String>> f = Flux.fromIterable(() -> statefulIterator);
ConnectableFlux<Tuple2<String, String>> co = f.publish();
Function<Predicate<Tuple2<String, String>>, Mono<Tuple2<String, String>>> findOne = (highlySelectivePredicate) ->
co.filter(highlySelectivePredicate)
.next() //gives us a Mono
.toProcessor() //makes it eagerly subscribe and demand from the upstream, so it wont miss emissions
.doOnSubscribe(s -> co.connect()); //when an actual user consumer subscribes
// Subscribing (outside the service)
assumeThat(consumedCount).hasValue(0);
Mono<Tuple2<String, String>> a2 = findOne.apply(select("a", "2"));
Mono<Tuple2<String, String>> b1 = findOne.apply(select("b", "1"));
Mono<Tuple2<String, String>> c1 = findOne.apply(select("c", "1"));
assertThat(consumedCount).hasValue(0);
// Data is needed
SoftAssertions softly = new SoftAssertions();
assertThat(a2.block()).isEqualTo(Tuples.of("a", "2"));
softly.assertThat(consumedCount).hasValue(4);
assertThat(b1.block()).isEqualTo(Tuples.of("b", "1"));
softly.assertThat(consumedCount).hasValue(4);
assertThat(c1.block()).isEqualTo(Tuples.of("c", "1"));
softly.assertThat(consumedCount).hasValue(4);
softly.assertAll();
}
private static Predicate<Tuple2<String, String>> select(String t1, String t2) {
return e -> e.getT1().equals(t1) && e.getT2().equals(t2);
}
}
Run Code Online (Sandbox Code Playgroud)
问题:我想知道如何实现通量结果,即应用过滤后的多个值,而不仅仅是第一个/下一个。(不过只要求很高尽可能必要)
(试过天真地更换.toProcessor()与.publish().autoConnect(0)但没成功)
编辑1: 虽然不允许对源进行缓冲,但是作为参数的过滤器应该具有很高的选择性,因此在过滤后进行缓冲是可以的。
编辑2:
过一会儿,我回到发布的示例上,尝试使用更高版本的示例reactor,它实际上可以工作。
io.projectreactor:reactor-bom:Californium-SR8
> io.projectreactor:reactor-core:3.2.9.RELEASE
Run Code Online (Sandbox Code Playgroud)
我不喜欢给出“不回答”风格的答案,但我认为至少必须在这里给出您的要求之一。从你的问题来看,要求似乎是:
以一个订阅者从 a 请求数据的情况为例Flux,其中的前几个元素Flux被消耗,然后最终另一个订阅者在未来的任意时间出现,需要相同的数据。有了上述要求,这是不可能的 - 您要么必须再次获取数据,要么将其保存在某个地方,并且您已经排除了这两个选项。
但是,如果您准备稍微放松这些要求,那么有一些潜在的选择:
如果您能以某种方式计算出您最终将获得的订阅者数量,那么您可以在订阅数量达到后autoConnect(n)自动连接到 a 。ConnectableFlux
如果您可以允许删除元素,那么您只需调用share();原始元素Flux即可使其在第一个订阅上自动连接,然后未来的订阅者将删除以前的元素。
这可能是更有前途的策略之一,因为你说:
没有并发或等待。(为订阅者加入提供非常短的等待时间是不可取的)
您可以将其变成Flux热源,在特定时间段内缓存所有发出的元素。这意味着您可以以一定量的内存为代价(但不缓冲整个流),为订阅者提供一小段等待时间,让他们可以订阅并仍然接收所有数据。
与上面类似,您可以使用该方法的另一种变体cache()来仅缓存已知数量的元素。如果您知道可以安全地将n元素装入内存,但不能再多了,那么这可以为您提供订阅者安全连接的最大可能时间。
| 归档时间: |
|
| 查看次数: |
754 次 |
| 最近记录: |