在这种情况下,为什么我们需要Publish和RefCount Rx运算符?

Zso*_*agy 5 java reactive-programming backpressure rx-java

我正在尝试熟悉反应背压处理的问题,特别是通过阅读这个维基:https://github.com/ReactiveX/RxJava/wiki/Backpressure

在缓冲段落中,我们有更多涉及的示例代码:

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
Run Code Online (Sandbox Code Playgroud)

如果我理解正确,我们通过为缓冲区运算符生成去抖动信号流来有效地去除突发源流.

但为什么我们需要在这里使用发布和引用计数器?如果我们放弃它们会导致什么问题?评论并没有让我更清楚,默认情况下RxJava Observables不是多播吗?

Glu*_*uck 3

答案在于热观测值和冷观测值之间的差异。

缓冲区运算符组合了两个流,并且无法知道它们有一个共同的源(在您的情况下)。当激活(订阅)时,它将同时订阅它们,这反过来将触发对您的原始输入的 2 个不同的订阅。

现在可能会发生两件事,要么输入是热可观察的,并且订阅除了注册侦听器之外没有任何效果,并且一切都将按预期工作,或者它是冷可观察的,并且每个订阅将导致潜在的不同和不同步的流。

例如,冷可观察对象可以是在订阅时执行网络请求并通知结果的可观察对象。不调用发布意味着将完成 2 个请求。

发布+引用计数/连接是将冷可观察值转换为热可观察值的常用方法,确保发生单个订阅,并且所有流的行为相同。