Observer 是 RxJS 中 Observable 的“监听器”吗?

Cer*_*ean 1 javascript observable rxjs

我正在学习 RxJS 并且对“侦听器”在哪里(在 Observable 或 Observer 中)、它们如何订阅/取消订阅以及当观察者“不再对”Observable 感兴趣时会发生什么感到困惑,例如当您使用take或 时takeUntil

对于第一部分——什么订阅了什么,什么是听众——我对这些陈述之间的看似矛盾感到困惑。从http://reactivex.io/rxjs/manual/overview.html我们读到 Observers 不是 Observables 的“听众”

这与 addEventListener / removeEventListener 等事件处理程序 API 截然不同。使用 observable.subscribe 时,给定的 Observer 不会在 Observable 中注册为侦听器。Observable 甚至不维护附加的观察者列表。

但在http://reactivex.io/learnrx/它说(练习 30)(突出显示我的)

基于事件的 Observable 永远不会自行完成。take() 函数创建一个新序列,该序列在离散数量的项目到达后完成。这很重要,因为与 Event 不同,当 Observable 序列完成时,它会取消订阅所有侦听器。这意味着如果我们使用 take() 来完成我们的 Event 序列,我们不需要取消订阅!

这对我来说似乎是矛盾的。例如,当您使用 设置 Observable 时,fromEvent事件侦听器在哪里?take(1)例如,当您在基于 DOM 事件的 Observable 上使用 时,在将第一个事件发送给观察者之后会发生什么?Observer 是否取消了 Observable 的订阅,它继续发出事件,只是 Observer 不再听它们了?或者 Observable 是否以某种方式取消了 Observer 的订阅,即 eventListener 在 Observable 中,而不是 Observer 中?

感谢您提供任何线索——显然我不是只见树木不见森林,而是我正在研究的教程,虽然他们擅长从概念上解释它,但让我对实际发生的事情感到困惑。

Nic*_*wer 5

第一部分是相当讲究它的单词使用,以强调订阅 observable 是调用一个函数(或更可能是一个函数链)来运行它们包含的所有代码的问题。第二部分对其措辞不太讲究,但实际上并不是在谈论同一件事。如果你愿意,第二部分最好写成“当一个 observable 完成时,它会调用它的观察者的拆卸逻辑。

当我说订阅一个 observable 是一个调用函数链的问题时,让我试着描述我的意思。考虑以下超级简单的例子:

对于一个超级简单的例子,假设我创建了这个 observable:

const justOne = Rx.Observable.create(function realSubscribe(observer) {
  observer.next(1);
  observer.complete();
});

justOne.subscribe(val => console.log(val));
Run Code Online (Sandbox Code Playgroud)

如果我然后调用justOne.subscribe(val => console.log(val)),这样做将立即调用我命名为 realSubscribe 的函数。然后执行observer.next(1),这导致注销 val,然后执行observer.complete()。就是这样。

在这个过程中,observable 没有创建或增加订阅者列表;它只是按顺序运行代码,然后就完成了。


现在转到一个更现实的例子,让我们考虑fromEvent. 如果我要实现它,它可能看起来像这样(真正的实现更复杂,但这得到了它的要点):

function fromEvent(element, eventName) {
  return Rx.Observable.create(function subscribeToEvent(observer) {
    element.addEventListener(eventName, observer.next);
    return function cleanup() {
      element.removeEventListener(eventName, observer.next);
    }
  });
}

const observable = fromEvent(document, 'click');
const subscription = observable.subscribe(event => console.log(event));
Run Code Online (Sandbox Code Playgroud)

现在,当我调用 observable.subscribe 时,它​​会运行 subscribeToEvent,并在此过程中调用文档上的 addEventListener。document.addEventListener确实导致文档保留了一个事件侦听器列表,但这是因为 addEventListener 的实现方式,而不是所有可观察对象的共同点。可观察对象本身不会跟踪任何侦听器。它只是调用它被告知要调用的内容,然后返回一个清理函数。


接下来让我们看看take。和以前一样,真正的实现更复杂,但它的作用大致如下:

// In the real `take`, you don't need to pass in another observable since that's
// available automatically from the context you called it in. But my sample code
// has to get it somehow.
function take(count, otherObservable) {
  return new Observable(function subscribeToTake(observer) {
    let soFar = 0;
    otherObservable.subscribe((value) => {
      observer.next(value);
      soFar++;
      if (soFar >= count) {
        observer.complete();
      }
    });
  });
}

const clickObservable = fromEvent(document, 'click');
take(1, clickObservable).subscribe(event => console.log(event))
Run Code Online (Sandbox Code Playgroud)

正如评论中提到的,我使用的语法与它在 rxjs 中的使用方式不太匹配,但那是因为要模仿它需要更完整的实现。无论如何,要引起您注意的主要事情是我们开始生成一系列功能:

当我打电话时.subscribe,它会调用 subscribeToTake。这设置了一个计数器,然后调用 otherObservable.subscribe,即 subscribeToEvent。subscribeToEvent 然后调用 document.addEventListener。

Take 的工作是坐在这个功能链的中间。它跟踪到目前为止已经发出了多少值。如果计数足够低,它只会转发值。但是一旦达到计数,它就会调用完成,从而结束 observable。调用 complete 会导致 observable 运行它拥有的任何拆卸逻辑,或者它的链拥有的任何东西。没有拆卸逻辑take,但fromEvent会运行一些拆卸逻辑来移除事件侦听器。