如何在发射之前使一个RxJS Observable序列等待另一个RxJS Observable序列完成?

Ste*_*hen 56 javascript observable rxjs

说我有一个Observable,就像这样:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });
Run Code Online (Sandbox Code Playgroud)

然后,我有第二个观察:

var two = someOtherObservable.take(1);
Run Code Online (Sandbox Code Playgroud)

现在,我想订阅Observable,但我想确保Observablesubscribe()订阅者被解雇之前已经完成.我可以用什么样的缓冲方法two让第二个等待第一个完成?

我想我想暂停one直到two完成.

pau*_*els 36

我能想到几种方式

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
Run Code Online (Sandbox Code Playgroud)

  • @mspasiuk根据OP的要求,他们只希望第二个订阅*在*第一个完成后.`forkJoin`同时订阅. (7认同)
  • 这个方法是否总是在 subscribe() 函数中的第二个(`two`)之前解析第一个 observable(`one`)? (3认同)
  • 我最终使用了“pause”和“resume”而不是“publish”和“connect”,但示例二本质上是我采取的路线。 (2认同)

And*_*tar 14

这是一种可重复使用的方法(它是打字稿,但您可以将其调整为 js):

function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) => signal.pipe(
        first(),
        switchMap(_ => source),
    );
}
Run Code Online (Sandbox Code Playgroud)

您可以像使用任何运算符一样使用它:

var two = someOtherObservable.pipe(waitFor(one), take(1));
Run Code Online (Sandbox Code Playgroud)

它基本上是一个操作符,它推迟对源 observable 的订阅,直到信号 observable 发出第一个事件。


Sim*_*ver 12

skipUntil()with last()

skipUntil:忽略发出的项目,直到另一个observable发出

last:从序列中发出最后一个值(即等到它完成然后发出)

注意,传递给observable skipUntil只需要发出任何东西来取消跳过,这就是我们需要放置last()的原因.

main$.skipUntil(sequence2$.pipe(last()))
Run Code Online (Sandbox Code Playgroud)

官方:https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


可能的问题:请注意,如果没有发出任何内容,则last()本身会出错.该last()运营商确实有last()配合使用的谓词参数,但只有当.我认为如果这种情况对你来说是个问题(如果default没有发出就可以完成),那么其中一个应该可以工作(目前未经测试):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
Run Code Online (Sandbox Code Playgroud)

请注意,这sequence2$是一个有效的项目,但实际上可以是任何值.另请注意,这是连接管道undefined而不是sequence2$管道.


Nik*_*kos 11

如果要确保保留执行顺序,可以使用flatMap作为以下示例

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));
Run Code Online (Sandbox Code Playgroud)

结果将是:

"1"
"11"
"111"
"finished"
Run Code Online (Sandbox Code Playgroud)


Joe*_*ing 9

这是利用switchMap的结果选择器的另一种可能性

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });
Run Code Online (Sandbox Code Playgroud)


Ant*_*ton 5

如果第二个可观察对象很热,则有另一种方法可以暂停/恢复

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () { 
  /* resume paused source2 */ 
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});
Run Code Online (Sandbox Code Playgroud)

您也可以使用缓冲版本pausableBuffered在暂停期间保持数据。


Ser*_*giu 5

这是一个用 TypeScript 编写的自定义运算符,它在发出结果之前等待信号:

export function waitFor<T>(
    signal$: Observable<any>
) {
    return (source$: Observable<T>) =>
        new Observable<T>(observer => {
            // combineLatest emits the first value only when
            // both source and signal emitted at least once
            combineLatest([
                source$,
                signal$.pipe(
                    first(),
                ),
            ])
                .subscribe(([v]) => observer.next(v));
        });
}
Run Code Online (Sandbox Code Playgroud)

你可以这样使用它:

two.pipe(waitFor(one))
   .subscribe(value => ...);
Run Code Online (Sandbox Code Playgroud)