Ste*_*hen 56 javascript observable rxjs
说我有一个Observable,就像这样:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
Run Code Online (Sandbox Code Playgroud)
然后,我有第二个观察:
var two = someOtherObservable.take(1);
Run Code Online (Sandbox Code Playgroud)
现在,我想订阅Observable
,但我想确保Observable
在subscribe()
订阅者被解雇之前已经完成.我可以用什么样的缓冲方法two
让第二个等待第一个完成?
我想我想暂停one
直到two
完成.
pau*_*els 36
我能想到几种方式
import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'
//Method one
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});
//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
Run Code Online (Sandbox Code Playgroud)
And*_*tar 14
这是一种可重复使用的方法(它是打字稿,但您可以将其调整为 js):
function waitFor<T>(signal: Observable<any>) {
return (source: Observable<T>) => signal.pipe(
first(),
switchMap(_ => source),
);
}
Run Code Online (Sandbox Code Playgroud)
您可以像使用任何运算符一样使用它:
var two = someOtherObservable.pipe(waitFor(one), take(1));
Run Code Online (Sandbox Code Playgroud)
它基本上是一个操作符,它推迟对源 observable 的订阅,直到信号 observable 发出第一个事件。
Sim*_*ver 12
skipUntil:忽略发出的项目,直到另一个observable发出
last:从序列中发出最后一个值(即等到它完成然后发出)
注意,传递给observable skipUntil
只需要发出任何东西来取消跳过,这就是我们需要放置last()的原因.
main$.skipUntil(sequence2$.pipe(last()))
Run Code Online (Sandbox Code Playgroud)
官方:https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
可能的问题:请注意,如果没有发出任何内容,则last()
本身会出错.该last()
运营商确实有last()
配合使用的谓词参数,但只有当.我认为如果这种情况对你来说是个问题(如果default
没有发出就可以完成),那么其中一个应该可以工作(目前未经测试):
main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
Run Code Online (Sandbox Code Playgroud)
请注意,这sequence2$
是一个有效的项目,但实际上可以是任何值.另请注意,这是连接管道undefined
而不是sequence2$
管道.
Nik*_*kos 11
如果要确保保留执行顺序,可以使用flatMap作为以下示例
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));
first
.flatMap(() => second)
.flatMap(() => third)
.subscribe(()=> console.log('finished'));
Run Code Online (Sandbox Code Playgroud)
结果将是:
"1"
"11"
"111"
"finished"
Run Code Online (Sandbox Code Playgroud)
这是利用switchMap的结果选择器的另一种可能性
var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
/** Wait for first Observable */
() => one$,
/** Only return the value we're actually interested in */
(value2, value1) => value2
)
.subscribe((value2) => {
/* do something */
});
Run Code Online (Sandbox Code Playgroud)
如果第二个可观察对象很热,则有另一种方法可以暂停/恢复:
var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);
source1.doOnCompleted(function () {
/* resume paused source2 */
pauser.onNext(true);
}).subscribe(function(){
// do something
});
source2.subscribe(function(){
// start to recieve data
});
Run Code Online (Sandbox Code Playgroud)
您也可以使用缓冲版本pausableBuffered在暂停期间保持数据。
这是一个用 TypeScript 编写的自定义运算符,它在发出结果之前等待信号:
export function waitFor<T>(
signal$: Observable<any>
) {
return (source$: Observable<T>) =>
new Observable<T>(observer => {
// combineLatest emits the first value only when
// both source and signal emitted at least once
combineLatest([
source$,
signal$.pipe(
first(),
),
])
.subscribe(([v]) => observer.next(v));
});
}
Run Code Online (Sandbox Code Playgroud)
你可以这样使用它:
two.pipe(waitFor(one))
.subscribe(value => ...);
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
76168 次 |
最近记录: |