RxJS:如何让一个Observer处理多个Observables?

JBC*_*BCP 6 javascript reactive-programming rxjs

我正在使用一个调用我实现的函数的框架.我希望将此函数的参数转换为Observable,并通过一系列Observers发送.我以为我可以使用一个主题,但它没有像我预期的那样表现.

为了澄清,我有类似下面的代码.我认为Option 1下面会有效,但到目前为止我一直在努力Option 2,这似乎不是惯用语.

var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
    function(event) {
        console.log("got event", event);
    },
    function(e) {
        log.error(e);
    },
    function() {
        console.log('eventSubject onCompleted');
    }
);

// The framework calls this method
function onEvent(eventArray) {

   var eventSource = Rx.Observable.from(eventArray);

   // Option 1: I thought this would work, but it doesn't
   // eventSource.subscribe(eventSubject);

   // Option 2: This does work, but its obviously clunky
  eventSource.subscribe(
      function(event) {
          log.debug("sending to subject");
          eventSubject.onNext(event);
      },
      function(e) {
          log.error(e);
      },
      function() {
          console.log('eventSource onCompleted');
      }
  );
}
Run Code Online (Sandbox Code Playgroud)

Bra*_*don 2

只是当您将整个Subject可观察对象订阅时,您最终将该onComplete可观察对象的事件订阅给了主题。这意味着当可观察完成时,它将完成你的主题。因此,当您获得下一组事件时,它们不会执行任何操作,因为主题已经完成。

一种解决方案正是您所做的。只需将事件订阅onNext到主题即可。

第二种解决方案,可以说更像“rx”,是将传入的数据视为可观察数据的可观察数据,并使用mergeAll压平此可观察数据流:

var eventSubject = new Rx.Subject(); // observes observable sequences
var resultSource = eventSubject
  .mergeAll() // subscribe to each inner observable as it arrives
  .map(processEvent);
var subscription = resultSource.subscribe(
    function(event) {
        console.log("got event", event);
    },
    function(e) {
        log.error(e);
    },
    function() {
        console.log('eventSubject onCompleted');
    }
);

// The framework calls this method
function onEvent(eventArray) {

   var eventSource = Rx.Observable.from(eventArray);
   // send a new inner observable sequence
   // into the subject
   eventSubject.onNext(eventSource);
}
Run Code Online (Sandbox Code Playgroud)

更新:这是一个正在运行的示例