正确使用takeUntil()停止可观察

Osu*_*kaa 0 observable observer-pattern rxjs rxjs5

我正在尝试创建2个可观察值,它们基本上可以进行逆运算。它是服务发现端点,因此在启动应用程序时,它必须尝试注册服务发现直到成功。所以我想到了创建这样的可观察对象:

const create$ = Rx.Observable.create((observer) => {
    observer.next('Trying to create observation');
    sp.put(endpoint, { json: true }, (err, res, payload) => {
      err ? observer.error(err) : observer.complete();
    });
  });
Run Code Online (Sandbox Code Playgroud)

当应用程序正常关闭时,我想执行逆操作。像这样:

const delete$ = Rx.Observable.create((observer) => {
    console.log('deleted subscribed');
    observer.next('Trying to delete observation');
    sp.delete(endpoint, { json: true }, (err, res, payload) => {
      err ? observer.error(err) : observer.complete();
    });
  });
Run Code Online (Sandbox Code Playgroud)

因此,我决定创建一个函数,该函数返回具有.create()和的对象.delete()。我要解决的问题是,当应用程序启动并尝试注册,但无法到达服务发现端点时,稍后应用程序启动其正常关闭过程并调用该.delete()操作,则该.create()操作不应再运行。

function observe({ url, version, serviceName }) {
  const endpoint = `/endpoint/${serviceName}/${version}/${encodeURIComponent(url)}`;

  const create$ = Rx.Observable.create((observer) => {
    observer.next('Trying to create observation');
    sp.put(endpoint, { json: true }, (err, res, payload) => {
      err ? observer.error(err) : observer.complete();
    });
  });

  const delete$ = Rx.Observable.create((observer) => {
    console.log('deleted subscribed');
    observer.next('Trying to delete observation');
    sp.delete(endpoint, { json: true }, (err, res, payload) => {
      err ? observer.error(err) : observer.complete();
    });
  });

  return {
    create() {
      return create$.retry(Number.POSITIVE_INFINITY).takeUntil(delete$); // This is where I would want to takeUntil()
    },
    delete({ interval = 5000, times = 0 } = {}) {
      return delete$.retry(times);
    },
  }
}
Run Code Online (Sandbox Code Playgroud)

我的问题是,使用.takeUntil()它时,它订阅delete$开始执行删除操作的删除操作,该操作具有create$立即停止可观察到的效果。

我试着做,takeUntil(Observable.merge(Observable.never(), delete$))但是它都订阅了,所以不起作用。我还尝试takeUntil(Observable.concat(Observable.never(), delete$))了第一个永不结束(:P),第二个永不订阅。

jay*_*lps 5

多数可观察物是冷的单播的。这是一个冗长的话题,因此我将主要参考伟大的文章《热与冷可观察者》,但总而言之:

COLD是您的可观察物创建生产者时

// COLD
var cold = new Observable((observer) => {
  var producer = new Producer();
  // have observer listen to producer here
});
Run Code Online (Sandbox Code Playgroud)

HOT是当您的可观察对象关闭生产者时

// HOT
var producer = new Producer();
var hot = new Observable((observer) => {
  // have observer listen to producer here
});
Run Code Online (Sandbox Code Playgroud)

在您的示例中,这是一个关键的区别,因为create$delete$都是冷的。如您所见,提供要订阅delete$takeUntil原因delete$,启动请求。

如果您希望保持代码的结构/ API不变,则一种实现方法是使用被视为“通知程序”的主题。主题是多播且“热门”(即使它们自己不做任何事情)。

function observe({ url, version, serviceName }) {
  // etc...

  const shutdown$ = new Subject(); // <---------------- create our notifier

  return {
    create() {
      return create$
        .retry(Number.POSITIVE_INFINITY)
        .takeUntil(shutdown$); // <-------------------- take our notifier
    },
    delete({ interval = 5000, times = 0 } = {}) {
      return Observable.defer(() => {
        shutdown$.next(); // <------------------------- notify
        return delete$.retry(times);
      });
    }
  };
}
Run Code Online (Sandbox Code Playgroud)

我们使用Observable.defer()shutdown$.next()是为了使每当有人实际订阅我们返回的Observable时,我们都可以执行该副作用。