Dal*_*lie 7 merge observable rxjs
一开始我的印象combineLast是很合适,但是当我阅读文档时,它似乎不是:“请注意,combineLatest在每个可观察对象发出至少一个值之前,它不会发出初始值。”......当然我正好碰到那个异常。我试着forkJoin和merge不同的组合,但我不能得到它的权利。
用例非常简单,该方法someObs返回一个0或更多的可观察对象,我在其上循环。基于SomeObs对象上的一个值,我将一个新构造的 observable 推入OtherObs[]一个Arrayof Observable<OtherObs[]>。这个数组“需要”被合并成一个单一的 observable,在返回它之前我想做一些转换。
具体来说,我很难用combineLast合适的东西替换操作员……
public obs(start: string, end: string): Observable<Array<OtherObs>> {
return this.someObs().pipe(
mergeMap((someObs: SomeObs[]) => {
let othObs: Array<Observable<OtherObs[]>> = [];
someObs.forEach((sobs: SomeObs) => {
othObs.push(this.yetAnotherObs(sobs));
});
return combineLatest<Event[]>(othObs).pipe(
map(arr => arr.reduce((acc, cur) => acc.concat(cur)))
);
})
);
}
private yetAnotherObs(): Observable<OtherObs[]> {
/// ...
}
private somObs(): Observable<SomeObs[]> {
/// ...
}
Run Code Online (Sandbox Code Playgroud)
Pau*_*aul 10
的“问题”combineLatest是(如您所说)它“在每个可观察对象发出至少一个值之前不会发出初始值”。但这不是问题,因为您可以使用 RxJS 运算符startWith。
所以你的 observable 得到一个初始值并且combineLatest像魅力一样工作 ;)
import { of, combineLatest } from 'rxjs';
import { delay, map, startWith } from 'rxjs/operators';
// Delay to show that this observable needs some time
const delayedObservable$ = of([10, 9, 8]).pipe(delay(5000));
// First Observable
const observable1$ = of([1, 2, 3]);
// Second observable that starts with empty array if no data
const observable2$ = delayedObservable$.pipe(startWith([]));
// Combine observable1$ and observable2$
combineLatest(observable1$, observable2$)
.pipe(
map(([one, two]) => {
// Because we start with an empty array, we already get data from the beginning
// After 5 seconds we also get data from observable2$
console.log('observable1$', one);
console.log('observable2$', two);
// Concat only to show that we map boths arrays to one array
return one.concat(two);
})
)
.subscribe(data => {
// After 0 seconds: [ 1, 2, 3 ]
// After 5 seconds: [ 1, 2, 3, 10, 9, 8 ]
console.log(data);
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9667 次 |
| 最近记录: |