将未知数量的可观察对象与 RxJS 合并

Rob*_*ert 5 javascript reactive-programming observable rxjs

我目前正在尝试创建一个小项目来演示使用 RxJS 进行响应式编程。目标是向我的同事展示这个东西就在那里,值得研究。我对框架没有经验,所以这让事情变得复杂。

我正在尝试扩展我的另一个演示以使用 RxJS。不是一个很复杂的demo,基本上我可以添加任意数量的小表格,得到一个小公式计算出来的数字,还有一个按钮,可以汇总所有表格的值。

在表格中计算公式很容易,但我想我可以走得更远。

我想通过合并的 observable 自动完成求和运算。我想出的唯一解决方案是:

//dummy observables
var s1 = Rx.Observable.Interval(100);
var s2 = Rx.Observable.Interval(200);

//Combine the observables
var m = s1.combineLatest(s2, function(x,y){return x+y});

//Subscribe to the combined observable
var sub = m.subscribe(function(x){console.log(x)});
//A new observable is created
var s3 = Rx.Observable.Interval(300);
//Now I need to update all my subscriptions, wich is a pain.
m = m.combine(s3, function(x,y){return x+y});
sub.dispose();
sub=m.subscribe(function(x){console.log(x)});
Run Code Online (Sandbox Code Playgroud)

我想我可以得到另一个 observable 来通知我的订阅者自我更新 - 因为知道我的所有订阅者的工作方式会使整个架构变得毫无用处,但这对于这样的任务来说听起来有点矫枉过正,我不仅仅是指演示,我真的无法想象有一个“每天”真实世界的例子,这样的架构比仅仅观察任何变化和从我的表单“主动”获取计算值更能让事情变得更干净。

我可能会在处理表单的模块内主动获取和汇总值,并让外部世界订阅“m”可观察对象,将我的值从模块内部推入其中。

这是一个正确的方法吗?我认为是的,因为它们归我的模块所有,我应该完全控制发生在它们身上的事情,但我真的很感兴趣更有经验的人对此的看法。

pau*_*els 1

我认为您不会找到可以直接执行您需要的操作的操作员。

不过,创建自己的运算符并没有什么问题:

var source = //An observable of observables of form data

Observable.prototype.combineLatestObservable = function(resultSelector) {
  var source = this;
  return Rx.Observable.create(function(obs) {
    var disposable = new Rx.SerialDisposable();
    var sources= [];
    return source.subscribe(
      function(x) {
        //Update the set of observables
        sources.push(x);
        //This will dispose of the previous subscription first
        //then subscribe to the new set.
        disposable.seDisposable(Rx.Observable.combineLatest(sources, resultSelector)
                                             .subscribe(obs));
      }, 
      function(e) { obs.onError(e); }, 
      function() { obs.onCompleted(); });
  }).share();
}
Run Code Online (Sandbox Code Playgroud)

或者,如果您想与运算符一起执行此操作:

//Have to use arguments since we don't know how many values we will have
function sums() {
  var sum = 0;
  for (var i = 0, len = arguments.length; i < len; ++i) { 
    sum += arguments[i]; 
  }
  return sum;
}

source
//Capture the latest set of Observables
.scan([], function(acc, x) {
  acc.push(x);
  return acc;
})
//Dispose of the previous set and subscribe to the new set
.flatMapLatest(function(arr) {
  return Observable.combineLatest(arr, sums);
})
//Don't know how many subscribers you have but probably want to keep from 
//recreating this stream for each
.share();
Run Code Online (Sandbox Code Playgroud)