我想将 2 个或多个 observable 动态组合成一个组合的 observable。
我了解如何将已经存在的两个 observable 与 组合在一起merge,但是当需要动态添加额外的 observable(例如超时后)时,我该如何解决“合并”问题?
此外,在combinedStream$“动态”合并另一个可观察对象时,不应丢失现有订阅。
这是我到目前为止所拥有的:
const action1$ = interval(1000).pipe(map(data => 'Action1 value:' + data));
const action2$ = interval(1000).pipe(map(data => 'Action2 value:' + data));
const combinedStream$ = merge(action1$, action2$);
combinedStream$.subscribe(data => console.log('Combined Stream Output:', data));
// Add another observable after some time...
setTimeout(() => {
const action3$ = interval(1000).pipe(map(data => 'Action3 value:' + data));
// How add this action3$ to the combined stream ?
}, 1000);
Run Code Online (Sandbox Code Playgroud)
这是我的堆栈闪电战:https ://stackblitz.com/edit/rxjs-s2cyzj
max*_*992 10
在处理该用例时,最简单的方法是拥有一个 observable... of observable,然后使用更高阶的函数,如concatAll、switch、mergeAll ...
const action1$: Observable<string> = interval(2000).pipe(
map(data => "Action1 value:" + data)
);
const action2$: Observable<string> = interval(2000).pipe(
map(data => "Action2 value:" + data)
);
const action3$: Observable<string> = interval(2000).pipe(
map(data => "Action3 value:" + data)
);
const mainStream$$: Subject<Observable<string>> = new Subject();
const combinedStream$ = mainStream$$.pipe(mergeAll());
combinedStream$.subscribe(data => console.log("Combined Stream Output:", data));
mainStream$$.next(action1$);
mainStream$$.next(action2$);
// Add another stream after some time...
setTimeout(() => {
mainStream$$.next(action3$);
}, 1000);
Run Code Online (Sandbox Code Playgroud)
演示:https : //stackblitz.com/edit/rxjs-stwvtc?file=index.ts
| 归档时间: |
|
| 查看次数: |
483 次 |
| 最近记录: |