fla*_*k37 2 javascript reactive-programming rxjs angular
我想要实现的是这个(使用 Angular 2/Typescript):
Observable A 产生事件流。
对于 Observable A 的每个事件,进行 8 次不同的 http 调用。(8个切换图)
在所有 8 个请求返回后,做一些事情(订阅 8 个 switchmap 的 zip)。
对 Observable A 的每个事件重复 8 次请求(由 switchmap 和 zip 处理)
代码:(完整代码在https://plnkr.co/edit/44yqw0RYzC7v1TFACMx1)
let source = Observable
.interval(5000)
.take(100);
let requests = [];
for(let i=0; i<8;i++) {
let request = source.switchMap(x=> http.get('https://jsonplaceholder.typicode.com/users/'+(i+1))).publish();
request.subscribe(res => console.log(res.json()));
requests.push(request);
}
Observable.zip(requests)
.subscribe(console.log("All requests completed"));
requests.forEach(r => r.connect());
Run Code Online (Sandbox Code Playgroud)
问题是我的 zip 永远不会被调用。我控制台.记录了对 8 个 switchmap 中的每一个的订阅,并且每次在 Observable/stream A 中有一个事件时,我得到的日志显示 8 个 http 调用成功返回。(也可以在网络选项卡中看到 8 个调用返回调试工具)
但是 zip 从不发出任何东西。
如果我尝试不同的(不太理想的)方法:
代码:(完整代码在https://plnkr.co/edit/GqQde1Ae2licBjtL0jcj)
let source = Observable
.interval(5000)
.take(100);
source.subscribe(x=> {
console.log(x);
let requests = [];
for(let i=0; i<8;i++) {
let request = http.get('https://jsonplaceholder.typicode.com/users/'+(i+1)).publish();
request.subscribe(res => console.log(res.json()));
requests.push(request);
}
Observable.forkJoin(requests)
.subscribe(console.log("All requests completed"));
requests.forEach(r => r.connect());
});
Run Code Online (Sandbox Code Playgroud)
这有效。但是有一个明显的缺陷,即每次 Observable A 发出时,我都会创建 8+1 个嵌套的 observables/subscriptions。
(在这两种情况下,我都使用发布/连接来共享/重用订阅,但即使没有它也存在问题)
如果您zip使用多个参数正确调用并传递要订阅的函数(而不是未定义的 console.log 的结果),则第一个示例将起作用。演示。
Observable.zip(...requests) // <-- spread this
.subscribe(() => console.log("All requests completed")); // <-- pass a function
requests.forEach(r => r.connect());
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6015 次 |
| 最近记录: |