ForkJoin 2 BehaviorSubjects

Laj*_*jos 7 javascript reactive-extensions-js rxjs typescript

我有两个行为主题流我正在尝试forkJoin没有运气.正如我想象的那样,它会回馈它的最后两个值.这可能以某种方式实现吗?

在主题之后不会调用它.

let stream1 = new BehaviorSubject(2);
let stream2 = new BehaviorSubject('two');

Observable.forkJoin(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });
Run Code Online (Sandbox Code Playgroud)

Ali*_*son 17

如果您不想(或不知道何时)调用complete(),则可以使用combineLatest代替forkJoin

使用combineLatest,当任何可观察源(在您的情况下,您的行为主体)发出一个值时,combineLatest将触发:

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

combineLatest(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });

stream1.next(3);
stream2.next('three');
Run Code Online (Sandbox Code Playgroud)

控制台日志:

(2) [2, "two"] // 初始状态

(2) [3, "two"] // 下一次在流 1 上触发

(2) [3, "three"] // 下一个在stream2上触发

现场演示:https : //stackblitz.com/edit/rxjs-qzxo3n


mar*_*tin 13

请注意forkJoin()其文档中的实际内容:

并行运行所有可观察序列并收集它们的最后元素.

这意味着forkJoin()所有输入Observable完成时发出一个值.使用BehaviorSubject此功能意味着显式调用complete()它们:

import { Observable, BehaviorSubject, forkJoin } from 'rxjs';

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

forkJoin(stream1, stream2)
  .subscribe(r => {
    console.log(r);
  });

stream1.complete();
stream2.complete();
Run Code Online (Sandbox Code Playgroud)

观看现场演示:http://plnkr.co/edit/MtYfGLgqgHACPswFTVJ5?p = preview


sjc*_*der 5

您可以使用上面提到的take(1)管道或方法。complete()

private subjectStream1 = new BehaviorSubject(null);
stream1$: Observable = this.subjectStream1.asObservable();

private subjectStream2 = new BehaviorSubject(null);
stream2$: Observable = this.subjectStream2.asObservable();

forkJoin({
  stream1: this.stream1$.pipe(take(1)),
  stream2: this.stream2$.pipe(take(1))
})
.pipe(takeUntil(this._destroyed$))
.subscribe(values) => console.log(values));
Run Code Online (Sandbox Code Playgroud)