Osu*_*kaa 0 observable observer-pattern rxjs rxjs5
我正在尝试创建2个可观察值,它们基本上可以进行逆运算。它是服务发现端点,因此在启动应用程序时,它必须尝试注册服务发现直到成功。所以我想到了创建这样的可观察对象:
const create$ = Rx.Observable.create((observer) => {
observer.next('Trying to create observation');
sp.put(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
Run Code Online (Sandbox Code Playgroud)
当应用程序正常关闭时,我想执行逆操作。像这样:
const delete$ = Rx.Observable.create((observer) => {
console.log('deleted subscribed');
observer.next('Trying to delete observation');
sp.delete(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
Run Code Online (Sandbox Code Playgroud)
因此,我决定创建一个函数,该函数返回具有.create()和的对象.delete()。我要解决的问题是,当应用程序启动并尝试注册,但无法到达服务发现端点时,稍后应用程序启动其正常关闭过程并调用该.delete()操作,则该.create()操作不应再运行。
function observe({ url, version, serviceName }) {
const endpoint = `/endpoint/${serviceName}/${version}/${encodeURIComponent(url)}`;
const create$ = Rx.Observable.create((observer) => {
observer.next('Trying to create observation');
sp.put(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
const delete$ = Rx.Observable.create((observer) => {
console.log('deleted subscribed');
observer.next('Trying to delete observation');
sp.delete(endpoint, { json: true }, (err, res, payload) => {
err ? observer.error(err) : observer.complete();
});
});
return {
create() {
return create$.retry(Number.POSITIVE_INFINITY).takeUntil(delete$); // This is where I would want to takeUntil()
},
delete({ interval = 5000, times = 0 } = {}) {
return delete$.retry(times);
},
}
}
Run Code Online (Sandbox Code Playgroud)
我的问题是,使用.takeUntil()它时,它订阅delete$开始执行删除操作的删除操作,该操作具有create$立即停止可观察到的效果。
我试着做,takeUntil(Observable.merge(Observable.never(), delete$))但是它都订阅了,所以不起作用。我还尝试takeUntil(Observable.concat(Observable.never(), delete$))了第一个永不结束(:P),第二个永不订阅。
多数可观察物是冷的和单播的。这是一个冗长的话题,因此我将主要参考伟大的文章《热与冷可观察者》,但总而言之:
COLD是您的可观察物创建生产者时
Run Code Online (Sandbox Code Playgroud)// COLD var cold = new Observable((observer) => { var producer = new Producer(); // have observer listen to producer here });HOT是当您的可观察对象关闭生产者时
Run Code Online (Sandbox Code Playgroud)// HOT var producer = new Producer(); var hot = new Observable((observer) => { // have observer listen to producer here });
在您的示例中,这是一个关键的区别,因为create$和delete$都是冷的。如您所见,提供要订阅delete$的takeUntil原因delete$,启动请求。
如果您希望保持代码的结构/ API不变,则一种实现方法是使用被视为“通知程序”的主题。主题是多播且“热门”(即使它们自己不做任何事情)。
function observe({ url, version, serviceName }) {
// etc...
const shutdown$ = new Subject(); // <---------------- create our notifier
return {
create() {
return create$
.retry(Number.POSITIVE_INFINITY)
.takeUntil(shutdown$); // <-------------------- take our notifier
},
delete({ interval = 5000, times = 0 } = {}) {
return Observable.defer(() => {
shutdown$.next(); // <------------------------- notify
return delete$.retry(times);
});
}
};
}
Run Code Online (Sandbox Code Playgroud)
我们使用Observable.defer()它shutdown$.next()是为了使每当有人实际订阅我们返回的Observable时,我们都可以执行该副作用。
| 归档时间: |
|
| 查看次数: |
3232 次 |
| 最近记录: |