问题:我想创建 Rxjs 组合(函数链),这将导致缓冲来自一个 Observable 的值,直到发生特定事件,然后同步发出所有缓冲值,然后缓冲直到下一个事件。
我使用这个函数来收集所有必须等到我的应用程序调用授权的 http 请求。然后运行所有这些请求。(它在 Angular4 HttpClient Interceptor 中实现),这就是我的用例,但我通常会寻求有关如何创建此类 rx 链的解决方案。
为什么 Rxjs 缓冲区不够用。从我读取和测试的缓冲区要么需要确切的时间范围,要么在获取调度程序而不是时间作为参数的情况下,它在检测到最后一个调度程序的“事件”传播后重新订阅调度程序。我希望它是这样工作的:当出现第一个请求时,我开始缓冲然后订阅调度程序,在调度程序发出后,停止缓冲,重新发送所有缓冲的值,并等待下一个新请求再次开始缓冲和再次启动调度程序。
现在我的解决方案是使用未定义或我的可观察的辅助对象,代码大致如下:
private observable: Observable<boolean>;
makeRequest(): Observable<boolean> {
if (this.observable !== void 0) {
return this.observable;
} else {
this.observable = this.authenticationReuqest()
.share()
.finally(() => this.observable = void 0);
return this.observable;
}
}
Run Code Online (Sandbox Code Playgroud)
通过这种方式,我有点缓冲我的请求,通过 maiking 它们 .delay() 直到相同的多播可观察对象发出,然后在它发出后我就清理它(尽管不需要取消订阅,因为它最终会在完成或出错后清理) .
如果有人知道如何用纯 Rxjs 替换这个解决方案,我很感兴趣。我有一些感觉,虽然我无法得到确切的解决方案,但 Buffer 和 Zip 的某种组合可以使它发生。
谢谢托马斯