RxJs flatMapLatest/switchMap取消回调.onCancel()在哪里?

Ben*_*n M 12 javascript reactive-programming observable rxjs

我有2个嵌套的Observable Streams来做HTTP请求.现在我想显示一个加载指示器,但无法使其正常工作.

var pageStream = Rx.createObservableFunction(_self, 'nextPage')
        .startWith(1)
        .do(function(pageNumber) {
            pendingRequests++;
        })
        .concatMap(function(pageNumber) {
            return MyHTTPService.getPage(pageNumber);
        })
        .do(function(response) {
            pendingRequests--;
        });

Rx.createObservableFunction(_self, 'search')
        .flatMapLatest(function(e) {
            return pageStream;
        })
        .subscribe();



search();
nextPage(2);
nextPage(3);
search();
Run Code Online (Sandbox Code Playgroud)

这将触发pendingRequests++4次,但pendingRequests--只触发一次,因为flatMapLatest在前3个HTTP响应到达之前将取消内部observable.

我找不到像onCancel回调那样的东西.我也试过onCompletedonError,但这些不会太受被触发flatMapLatest.

有没有其他方法让这个工作?

谢谢!

编辑:所需的加载指示器行为

  1. 示例:单个search()呼叫.

    • search() - >开始加载指标
    • 当search()响应返回时 - >禁用加载指示符
  2. 示例:search()nextPage()致电.( search()响应返回之前调用nextPage().)

    • search() - >开始加载指标
    • nextPage() - >指标已经启动,但这里没什么可做的
    • 两个响应到达后停止加载指示器
  3. 示例:search(),search().(search()调用相互覆盖,但第一个的响应可以被解除)

    • search() - >开始加载指标
    • search() - >指标已经启动,但这里没什么可做的
    • 第二次搜索()的响应到达时停止加载指示符
  4. 例如:search(),nextPage(),search().(再次:由于第二次搜索(),可以忽略前一个search()和nextPage()的响应)

    • search() - >开始加载指标
    • nextPage() - >指标已经启动,但这里没什么可做的
    • search() - >指标已经启动,但这里没什么可做的
    • 第二次搜索()的响应到达时停止加载指示符
  5. 示例:search(),nextPage().但是这次在search()响应回来之后调用nextPage().

    • search() - >开始加载指标
    • 停止加载指标,因为search()响应到了
    • nextPage() - >开始加载指标
    • 停止加载指标,因为nextPage()响应到了

我尝试使用pendingRequests计数器,因为我可以同时拥有多个相关请求(例如:) search(), nextPage(), nextPage().当然,我想所有相关请求完成禁用加载指示器.

在呼叫时search(), search(),第一个search()是无关紧要的.同样适用search(), nextPage(), search().在这两种情况下,只有一个活跃的相关请求(最后一个search()).

art*_*iak 3

使用switchMapaka,flatMapLatest您希望在新的外部项目到达时尽快修剪当前内部流的执行。这无疑是一个很好的设计决策,否则它会带来很多混乱并允许一些令人毛骨悚然的行为。如果你真的想做某事,onCancel你可以随时使用自定义回调创建你自己的可观察对象unsubscribe。但我仍然建议不要unsubscribe与外部环境的状态变化结合起来。理想情况下,它们unsubscribe只会清理内部使用的资源。

onCancel不过,您的具体情况无需访问或类似即可解决。关键的观察是 - 如果我正确理解你的用例 - 所有search先前/待处理的操作都可能被忽略。因此,我们不必担心计数器递减,只需从 1 开始计数即可。

关于该片段的一些评论:

  • 用于BehaviorSubject计算待处理请求 - 因为它已准备好与其他流组合;
  • 检查了您在问题中发布的所有案例并且它们有效;
  • 添加了一些模糊测试来证明正确性;
  • 不确定您是否想在nextPagea 仍处于待处理状态时允许 - 但这似乎只是使用vssearch的问题;concatMapTomerge
  • 仅使用标准Rx运算符。

PLNKR

console.clear();

const searchSub = new Rx.Subject(); // trigger search 
const nextPageSub = new Rx.Subject(); // triger nextPage
const pendingSub = new Rx.BehaviorSubject(); // counts number of pending requests

const randDurationFactory = min => max => () => Math.random() * (max - min) + min;
const randDuration = randDurationFactory(250)(750);
const addToPending = n => () => pendingSub.next(pendingSub.value + n);
const inc = addToPending(1);
const dec = addToPending(-1);

const fakeSearch = (x) => Rx.Observable.of(x)
  .do(() => console.log(`SEARCH-START: ${x}`))
  .flatMap(() => 
    Rx.Observable.timer(randDuration())
    .do(() => console.log(`SEARCH-SUCCESS: ${x}`)))

const fakeNextPage = (x) => Rx.Observable.of(x)
  .do(() => console.log(`NEXT-PAGE-START: ${x}`))
  .flatMap(() =>
    Rx.Observable.timer(randDuration())
    .do(() => console.log(`NEXT-PAGE-SUCCESS: ${x}`)))

// subscribes
searchSub
  .do(() => console.warn('NEW_SEARCH'))
  .do(() => pendingSub.next(1)) // new search -- ingore current state
  .switchMap(
    (x) => fakeSearch(x)
    .do(dec) // search ended
    .concatMapTo(nextPageSub // if you wanted to block nextPage when search still pending
      // .merge(nextPageSub // if you wanted to allow nextPage when search still pending
      .do(inc) // nexpage started
      .flatMap(fakeNextPage) // optionally switchMap
      .do(dec) // nextpage ended
    )
  ).subscribe();

pendingSub
  .filter(x => x !== undefined) // behavior-value initially not defined
  .subscribe(n => console.log('PENDING-REQUESTS', n))

// TEST
const test = () => {
    searchSub.next('s1');
    nextPageSub.next('p1');
    nextPageSub.next('p2');

    setTimeout(() => searchSub.next('s2'), 200)
  }
// test();

// FUZZY-TEST
const COUNTER_MAX = 50;
const randInterval = randDurationFactory(10)(350);
let counter = 0;
const fuzzyTest = () => {
  if (counter % 10 === 0) {
    searchSub.next('s' + counter++)
  }
  nextPageSub.next('p' + counter++);
  if (counter < COUNTER_MAX) setTimeout(fuzzyTest, randInterval());
}

fuzzyTest()
Run Code Online (Sandbox Code Playgroud)
<script src="https://npmcdn.com/rxjs@5.0.0-beta.11/bundles/Rx.umd.js"></script>
Run Code Online (Sandbox Code Playgroud)