Ben*_*n M 12 javascript reactive-programming observable rxjs
我有2个嵌套的Observable Streams来做HTTP请求.现在我想显示一个加载指示器,但无法使其正常工作.
var pageStream = Rx.createObservableFunction(_self, 'nextPage')
.startWith(1)
.do(function(pageNumber) {
pendingRequests++;
})
.concatMap(function(pageNumber) {
return MyHTTPService.getPage(pageNumber);
})
.do(function(response) {
pendingRequests--;
});
Rx.createObservableFunction(_self, 'search')
.flatMapLatest(function(e) {
return pageStream;
})
.subscribe();
search();
nextPage(2);
nextPage(3);
search();
Run Code Online (Sandbox Code Playgroud)
这将触发pendingRequests++4次,但pendingRequests--只触发一次,因为flatMapLatest在前3个HTTP响应到达之前将取消内部observable.
我找不到像onCancel回调那样的东西.我也试过onCompleted和onError,但这些不会太受被触发flatMapLatest.
有没有其他方法让这个工作?
谢谢!
示例:单个search()呼叫.
示例:search()并nextPage()致电.(在 search()响应返回之前调用nextPage().)
示例:search(),search().(search()调用相互覆盖,但第一个的响应可以被解除)
例如:search(),nextPage(),search().(再次:由于第二次搜索(),可以忽略前一个search()和nextPage()的响应)
示例:search(),nextPage().但是这次在search()响应回来之后调用nextPage().
我尝试使用pendingRequests计数器,因为我可以同时拥有多个相关请求(例如:) search(), nextPage(), nextPage().当然,我想在所有相关请求完成后禁用加载指示器.
在呼叫时search(), search(),第一个search()是无关紧要的.同样适用search(), nextPage(), search().在这两种情况下,只有一个活跃的相关请求(最后一个search()).
使用switchMapaka,flatMapLatest您希望在新的外部项目到达时尽快修剪当前内部流的执行。这无疑是一个很好的设计决策,否则它会带来很多混乱并允许一些令人毛骨悚然的行为。如果你真的想做某事,onCancel你可以随时使用自定义回调创建你自己的可观察对象unsubscribe。但我仍然建议不要unsubscribe与外部环境的状态变化结合起来。理想情况下,它们unsubscribe只会清理内部使用的资源。
onCancel不过,您的具体情况无需访问或类似即可解决。关键的观察是 - 如果我正确理解你的用例 - 所有search先前/待处理的操作都可能被忽略。因此,我们不必担心计数器递减,只需从 1 开始计数即可。
关于该片段的一些评论:
BehaviorSubject计算待处理请求 - 因为它已准备好与其他流组合;nextPagea 仍处于待处理状态时允许 - 但这似乎只是使用vssearch的问题;concatMapTomergeRx运算符。console.clear();
const searchSub = new Rx.Subject(); // trigger search
const nextPageSub = new Rx.Subject(); // triger nextPage
const pendingSub = new Rx.BehaviorSubject(); // counts number of pending requests
const randDurationFactory = min => max => () => Math.random() * (max - min) + min;
const randDuration = randDurationFactory(250)(750);
const addToPending = n => () => pendingSub.next(pendingSub.value + n);
const inc = addToPending(1);
const dec = addToPending(-1);
const fakeSearch = (x) => Rx.Observable.of(x)
.do(() => console.log(`SEARCH-START: ${x}`))
.flatMap(() =>
Rx.Observable.timer(randDuration())
.do(() => console.log(`SEARCH-SUCCESS: ${x}`)))
const fakeNextPage = (x) => Rx.Observable.of(x)
.do(() => console.log(`NEXT-PAGE-START: ${x}`))
.flatMap(() =>
Rx.Observable.timer(randDuration())
.do(() => console.log(`NEXT-PAGE-SUCCESS: ${x}`)))
// subscribes
searchSub
.do(() => console.warn('NEW_SEARCH'))
.do(() => pendingSub.next(1)) // new search -- ingore current state
.switchMap(
(x) => fakeSearch(x)
.do(dec) // search ended
.concatMapTo(nextPageSub // if you wanted to block nextPage when search still pending
// .merge(nextPageSub // if you wanted to allow nextPage when search still pending
.do(inc) // nexpage started
.flatMap(fakeNextPage) // optionally switchMap
.do(dec) // nextpage ended
)
).subscribe();
pendingSub
.filter(x => x !== undefined) // behavior-value initially not defined
.subscribe(n => console.log('PENDING-REQUESTS', n))
// TEST
const test = () => {
searchSub.next('s1');
nextPageSub.next('p1');
nextPageSub.next('p2');
setTimeout(() => searchSub.next('s2'), 200)
}
// test();
// FUZZY-TEST
const COUNTER_MAX = 50;
const randInterval = randDurationFactory(10)(350);
let counter = 0;
const fuzzyTest = () => {
if (counter % 10 === 0) {
searchSub.next('s' + counter++)
}
nextPageSub.next('p' + counter++);
if (counter < COUNTER_MAX) setTimeout(fuzzyTest, randInterval());
}
fuzzyTest()Run Code Online (Sandbox Code Playgroud)
<script src="https://npmcdn.com/rxjs@5.0.0-beta.11/bundles/Rx.umd.js"></script>Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3982 次 |
| 最近记录: |