RxJS:我如何"手动"更新Observable?

Lia*_*amD 124 reactive-programming rxjs

我认为我必须误解一些基本的东西,因为在我看来,这应该是一个可观察的最基本的案例,但对于我的生活,我无法弄清楚如何从文档中做到这一点.

基本上,我希望能够做到这一点:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}
Run Code Online (Sandbox Code Playgroud)

但是我找不到像这样的方法push.我正在使用它作为点击处理程序,我知道他们已经Observable.fromEvent为此,但我正在尝试将它与React一起使用,而我宁愿只能在回调中更新数据流,而不是使用完全不同的事件处理系统.基本上我想要这个:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});
Run Code Online (Sandbox Code Playgroud)

我得到的最接近的是使用observer.onNext('foo'),但这似乎并没有真正发挥作用,并且这是在观察者身上调用的,这似乎并不正确.观察者应该是对数据流作出反应的东西,而不是改变它,对吧?

我只是不理解观察者/可观察的关系吗?

lui*_*iel 139

在RX中,Observer和Observable是不同的实体.观察者订阅了一个Observable.Observable通过调用观察者的方法向其观察者发出项目.如果需要调用范围之外的观察者方法,Observable.create()可以使用Subject,它是一个同时充当观察者和Observable的代理.

你可以这样做:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}
Run Code Online (Sandbox Code Playgroud)

您可以在此处找到有关主题的更多信息:

  • 只需注意.. onNext函数现在重命名为`next` (27认同)
  • 如果人们无法使用主题而必须使用可观察量,该怎么办? (2认同)

the*_*ana 32

我相信Observable.create()不会将观察者视为回调参数而是发射器.因此,如果要向Observable添加新值,请尝试以下方法:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"
Run Code Online (Sandbox Code Playgroud)

是Subject使得它变得更容易,在同一个对象中提供Observable和Observer,但它并不完全相同,因为当一个observable只向最后一个订阅的观察者发送数据时,Subject允许你将多个观察者订阅到同一个observable,所以有意识地使用它.如果你想修补它,这是一个JsBin.