rxjs观察阵列推送

Mar*_*y B 6 javascript arrays observable rxjs

我想监视何时使用observable将对象推送到数组上.我想从一个空数组开始,当发生推送时,我希望observable检测并处理它,然后等到下一次推送.这与"fromEvent"非常相似,其中observable等待事件.下面的代码立即调用completed(),因为数组是空的,我如何让它等待推送?

    var testArray = [];

    test(){
      var o = {timestamp: new Date()}
      testArray.push(o)
    }

    var o = Observable
        .from(testArray)
        .concatMap( x => {
          return x;
    });

    o.subscribe( 
      x => { console.log("onNext x=",x.timestamp) },
      e => console.log('onError:', e),
      () => {console.log('onCompleted');} );
Run Code Online (Sandbox Code Playgroud)

注意:输入机制不必是数组.任何类型的消息队列对象都适用于我.

Bra*_*rez 8

如果您要做的就是创建一个可以"推送"值的Observable,我建议使用RXJS主题.

const date$ = new Rx.Subject();
date$.next(new Date());
Run Code Online (Sandbox Code Playgroud)

现在您有一个Observable Date对象流,您可以使用该next()方法"推送"到该对象.

如果您确实需要为队列提供中间(非可观察)数据类型,那么我建议使用新的ES6功能代理.

const queue = new Proxy([], {
  set: function(obj, prop, value) {
    if (!isNaN(prop)) {
      date$.next(value)
    }
    obj[prop] = value
    return true
  },
})
Run Code Online (Sandbox Code Playgroud)

现在你有一个代理的数组,这样每当一个值被添加到它时,它就会被添加到你的Observable流中.

  • 我应该补充的一件事是,在开始将值推送到主题之前,您需要订阅主题,因为它会很“热”。 (2认同)

Bal*_*des 3

您可以子类化Array并实现某种通知机制来告诉您何时发生推送(这实际上是简单的):

class CustomArray extends Array {
  push(e) {
    super.push(e)
    if (this._listeners) {
      this._listeners.forEach(l => l(e))
    }
  }
  addPushListener(listener) {
    this._listeners = this._listeners || []
    this._listeners.push(listener)
  }
  removePushListener(listener) {
    if (this._listeners) {
      const index = this._listeners.indexOf(listener)
      if (index >= 0) {
        this._listeners.splice(index, 1)
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

然后使用一个函数,你可以将其包装成一个Observable

const observePushes = array => Rx.Observable.fromEventPattern(
  array.addPushListener.bind(array),
  array.removePushListener.bind(array)
)
Run Code Online (Sandbox Code Playgroud)

然后,您就可以随时订阅更改并取消订阅,就像任何其他可观察的一样。

const arr = new CustomArray()
const pushObservable = observePushes(arr)

const subscription = pushObservable.subscribe(e => console.log(`Added ${e}`))

arr.push(1)
arr.push(2)
arr.push(3)
arr.push("a")

subscription.dispose()

arr.push("b")
Run Code Online (Sandbox Code Playgroud)

另请注意,这Observable永远不会真正完成,因为您在任何时候都无法保证不会再向数组添加任何内容。

小提琴:http://jsfiddle.net/u08daxdv/1/