我对"dispose"或"unsubscribe"函数的用途是什么感到困惑,它(可选)从一个可观察的"executor"函数返回,如下所示:
const Rx = require('rxjs');
const obs = Rx.Observable.create(obs => {
// we are in the Observable "executor" function
obs.next(4);
// we return this function, which gets called if we unsubscribe
return function () {
console.log('disposed');
}
});
const s1 = obs.subscribe(
function (v) {
console.log(v);
},
function (e) {
console.log(e);
},
function () {
console.log('complete');
}
);
const s2 = obs.subscribe(
function (v) {
console.log(v);
},
function (e) {
console.log(e);
},
function () {
console.log('complete');
}
);
s1.unsubscribe();
s2.unsubscribe();
Run Code Online (Sandbox Code Playgroud)
令我困惑的是,这样的函数实际上更有可能保留代码中的引用,从而防止垃圾收集.
任何人都可以告诉我在这种情况下返回函数的目的是什么,调用函数是什么,以及它的签名是什么?我无法弄清楚它的相关信息.
我还看到了从执行程序函数返回订阅的更复杂的示例,例如:
let index = 0;
let obsEnqueue = this.obsEnqueue = new Rx.Subject();
this.queueStream = Rx.Observable.create(obs => {
const push = Rx.Subscriber.create(v => {
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
});
return obsEnqueue.subscribe(push);
});
Run Code Online (Sandbox Code Playgroud)
这似乎返回订阅而不是简单的函数.任何人都可以解释这是怎么回事?
为了使它成为一个明确的问题,这样做有什么区别:
const sub = new Rx.Subject();
const obs = Rx.Observable.create($obs => {
$obs.next(4);
return sub.subscribe($obs);
});
Run Code Online (Sandbox Code Playgroud)
并且不返回订阅调用的结果:
const sub = new Rx.Subject();
const obs = Rx.Observable.create($obs => {
$obs.next(4);
sub.subscribe($obs);
});
Run Code Online (Sandbox Code Playgroud)
当下游不再侦听流时,将调用需要返回的unsubscribe函数,Rx.Observable.create从而有效地为您提供时间来清理资源.
关于你的问题; .subscribe()返回您可以调用的订阅.unsubscribe().因此,如果您想对其他订阅执行某些操作,则可以通过该订阅管理您的下游:
const obs = Rx.Observable.create($obs => {
const timer = Rx.Observable.interval(300)
.do(i => console.log('emission: ' + i))
return timer.subscribe($obs);
});
obs.take(4).subscribe(i => console.log('outer-emission:'+i))Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>Run Code Online (Sandbox Code Playgroud)
如果没有取消订阅功能,您将停止监听observable,但内部创建的间隔将继续运行:
const obs = Rx.Observable.create($obs => {
const timer = Rx.Observable.interval(300)
.do(i => console.log('emission: ' + i))
.take(10)
.subscribe(
val => $obs.next(val),
err => $obs.error(err),
() => $obs.complete()
);
return function(){} // empty unsubscribe function, internal subscription will keep on running
});
obs.take(4).subscribe(i => console.log('outer-emission:'+i))Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>Run Code Online (Sandbox Code Playgroud)