RxJs - 仅当有订阅者时才计算和发出值

Umu*_*zer 3 rxjs rxjs5

我想创建一个发出文件添加/删除(通过chokidar)的 observable 。我可以通过这样的方式做到这一点:

Rx.Observable.create((subscriber) => {
  this.watcher = chokidar.watch(
     this.contentPath
  );
  this.watcher.on('addDir', () => { subscriber.next(); });
  this.watcher.on('unlinkDir', () => { subscriber.next(); });
});
Run Code Online (Sandbox Code Playgroud)

我想要做的是,如果没有订阅者,我想停止观看文件,并在订阅时重新开始。像这样的东西,但使用 RxJs:

class Notifier {
  constructor() {
    this.subscriberCount = 0;
  }

  subscribe(onNext, onError, complete) {
    this.subscriberCount++;
    if (this.subscriberCount === 1) {
      this.startInternalWatcher();
    }
    return () => {
      this.subscriberCount--;
      if (this.subscriberCount === 0) {
        this.stopInternalWatcher();
      }
    }
  }
}

// files are not watched
const n = new Notifier();

const s1 = n.subscribe(() => {}) // files are being wacthed
const s2 = n.subscribe(() => {}) // files are being wacthed
s1() // unsubscribed from 1, files are still watched.
s2() // unsubscribed from 2, files are not watched because no one is interested in.
Run Code Online (Sandbox Code Playgroud)

我是 RxJs 的新手,所以我可能会错过一些明显的解决方案。这可能吗?

Ric*_*lay 5

你在正确的轨道上。首先,如果您从创建者返回一个函数,它将在订阅取消时被调用,因此您可以使用它来销毁观察者。

这应该可以解决您的大部分问题,但是如果您想确保一次最多有一个“观察者”,您可以继续refCount

return Rx.Observable.create((subscriber) => {
  this.watcher = chokidar.watch(
     this.contentPath
  );
  this.watcher.on('addDir', () => { subscriber.next(); });
  this.watcher.on('unlinkDir', () => { subscriber.next(); });

  return () => this.watcher.off('addDir unlinkDir');
})
.publish()
.refCount();
Run Code Online (Sandbox Code Playgroud)