如何使用rxjs存储扫描的累积结果

gab*_*abo 3 javascript reactive-programming reactive-extensions-js rxjs

合并后我有两个合并的observable和一个扫描.第一个是简单范围,另一个是主题.每当Subject发出一个新值时,onNext我在扫描中连接该值并将新数组作为累加器返回.如果我处理了我的订阅,然后再次订阅它会重放该范围中的值,但是我丢失了主题中的值.在下面的代码中,我希望我的第二个订阅的最终值为[1, 2, 3, 4, 5]

最好的方法是什么?现在我有另一个主题,我存储了最终值并订阅了它,但感觉不对.

这是一个简单的版本,演示了正在发生的事情:

var Rx = require('rx');

var source = Rx.Observable.range(1, 3);

var adder = new Rx.Subject();

var merged = source.merge(adder)
                    .scan([], function(accum, x) {
                        return accum.concat(x);
                    });

var subscription1 = merged.subscribe(function(x) {console.log(x)});
adder.onNext(4);
adder.onNext(5);

subscription1.dispose();

console.log('After Disposal');

var subscription2 = merged.subscribe(function(x) {console.log(x)});
Run Code Online (Sandbox Code Playgroud)

这输出:

[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
[ 1, 2, 3, 4 ]
[ 1, 2, 3, 4, 5 ]
After Disposal
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
Run Code Online (Sandbox Code Playgroud)

And*_*ltz 7

Subject是一个热门的Observable,这就是为什么第二个订阅不会看到来自Subject的事件.Observable范围很冷,因此每个"执行实例"完全归每个订阅所有.另一方面,Subject的"执行实例"是单例且独立的,因此第二个订阅不会看到事件.

有几种方法可以解决这个问题.

  1. 使用ReplaySubject.您必须指定缓冲区大小.如果您对该缓冲区没有良好的限制,则使用无限制的缓冲区可能会导致内存问题.
  2. 避免使用主题.换句话说,避免使用热Observable,用冷Observable替换它,并根据我在开头给出的问题描述,你的订阅不会有问题,他们会看到相同的事件.通常,Subject可以被Observable替换,但这取决于您的整体架构.可能需要重写很多.在最坏的情况下,例如Observables的循环依赖,你无法避免使用Subject.
  3. 重新排列代码,以便订阅在主题开始发布之前开始,因此所有订阅都有机会看到来自热门Observable的"实时"发射.

但是,如果我对此问题的解释是正确的,那么您只需要发出的最后一个事件merged,因此您可以使用替代(1)的变体,其中您只重放最后一个事件.这将是添加的问题.shareReplay(1)merged,这将使其成为热重播观察到.