使用rxjs5在typescript中构建无限滚动列表

Nuv*_*nda 7 rxjs typescript rxjs5

我正在尝试使用TypeScript和rxjs构建无限滚动列表.也就是说,我希望应用程序从后端获取几页结果,然后在用户滚动到底部附近时获取更多结果.

我有一个Observable,用Observable.prototype.expand()它构建,它将给我所有结果,最终从服务器获取所有页面.但是由于Observable的性质,我不能暂停这个过程.一旦我订阅它将不可避免地尽快获取所有结果.我需要一个不同的解决方案,我可以以我需要的速度从结果流中提取.

由于我无法直接从API获取第二页,每个页面都包含我需要获取下一页的信息,事情变得更加复杂.回复如下:

interface GraphApiResponse {
    data?: any[];
    paging?: {
        cursors: {
            before: string,
            after: string
        },
        next?: string,
        previous?: string
    };
}
Run Code Online (Sandbox Code Playgroud)

的存在paging.next表明还有另外一个页面,并paging.cursors.after用于实际检索.

我似乎无法弄清楚如何实现这一点,而不会让它变得混乱.然而,无限列表似乎是一个普遍的问题,它不太可能没有一个好的解决方案.我应该如何实现这一点,而不是把事情搞得一团糟?

我尝试过的事情

可承诺的承诺

我的第一个想法是使用Iterable of Promises,但是我不知道会得到多少结果,迫使我建立一个无限的,Iterable<Promise<Response?>>其Promise将undefined在某一点之后解决.然而,由于它是无限的,我不能正常迭代(它将用Promises填充整个可用内存),实际上当它们处于该形式时使用结果意味着在前一个的解析函数中获取每个Promise.

这个解决方案看起来可能有用,但是我写的每一行都变得不那么容易阅读和复杂了.

将其与行为主体合并

在谷歌搜索问题时,我发现了一个相关的SO问题以及关于rxjs背压的GitHub问题,两者都包含Ben Lesh的代码片段,显然可以用来向Observable添加背压,遗憾的是,无论我尝试什么,我无法获取源Observable发出它的值比生成它们的速度慢,它们总是只是在某处缓冲,这意味着无论如何都会发生网络请求.

来自GitHub:

// this behavior subject is basically your "give me the next batch" mechanism.
// in this example, we're going to make 5 async requests back to back before requesting more.
const BATCH_SIZE = 5;
const requests = new BehaviorSubject(BATCH_SIZE); // start by requesting five items

// for every request, pump out a stream of events that represent how many you have left to fulfill
requests.flatMap((count) => Observable.range(0, count).map(n => count - n - 1))
  // then concat map that into an observable of what you want to control with backpressure
  // you might have some parameterization here you need to handle, this example is simplified
  // handle side effects with a `do` block
  .concatMap(() => getSomeObservableOfDataHere().do(stuffWithIt), (remaining) => remaining)
  // narrow it down to when there are no more left to request,
  // and pump another batch request into the BehaviorSubject
  .filter(remaining => remaining === 0)
  .mapTo(BATCH_SIZE)
  .subscribe(requests);
Run Code Online (Sandbox Code Playgroud)

来自StackOverflow:

// start with 5 values
const controller = new Rx.BehaviorSubject(5);

// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)

const controlled = controller.flatMap(
      // map your count into a set of values
      (count) => source.take(count), 
      // additional mapping for metadata about when the block is done
      (count, value, _, index) => {
        return { value: value, done: count - index === 1 }; 
      })
      // when the block is done, request 5 more.
      .do(({done}) => done && controller.next(5))
      // we only care about the value for output
      .map(({value}) => value);


// start our subscription
controlled.subscribe(x => {
  console.log(x)
});
Run Code Online (Sandbox Code Playgroud)

我可能错了,但在我看来,一旦我订阅了Observable,它会尽可能快地生成它的值,而无法减慢它,所以这可能不是解决方案.

使用ixjs

似乎ixjs是我的问题的解决方案,但是该存储库在很长一段时间内还没有更新.显然在TypeScript中一个重新实现,但是这似乎是在开发的早期阶段,并没有很好地记录喷射.

我宁愿不依赖于这么少人使用的框架,这实际上是一个非常简单的问题.

重构应用程序

我在线搜索了TypeScript中的无限滚动列表的实现(使用Angular).我目前的方法是拥有一个提供Object的服务,可用于获取所有结果.然后我有一个组件显示它们.替代方案似乎是在查询后端的服务中检查滚动位置,或者在用户滚动时让组件从后端服务获取新的Observable.

这两种解决方案都会迫使我混合使用目前整齐划分的代码.我希望让服务返回一些东西,我可以只是提供给组件,而组件不必知道网络请求,或者服务必须知道滚动位置.

pau*_*els 8

我建议你改用mergeScan运算符.看起来这可能是一个很好的契合.

MergeScan类似于expand运算符,因为它将来自先前请求的数据作为累加器反馈,但expand与之不同,它不会继续运行直到时间结束.

基本上假设您有一个makeRequest(params)接收请求并返回Observable最终解析为响应的函数,而流表示我们将调用的滚动事件fetchMore$,您可以创建一个按需获取服务,如下所示:

// This is abstracted as a simple "fetch" concept but in reality should
// be hooked up to your scroll handler, properly debounced etc.
this.fetchMore$
  .mergeScan(
    // Make the request
    (acc, _) => makeRequest(acc.paging.next ? acc.paging.cursors.after : ''),
    {paging: {}}, // Initial request body
    1 // Maximum concurrency, i.e. how many requests can be in flight at once 
  )
  .pluck('data')
  .subscribe(data => {/*Do something with the data*/});
Run Code Online (Sandbox Code Playgroud)

我将并发性设置为1,因为虽然您可以在飞行中有多个请求,但目前无法保证顺序,因此结果可能是如果用户滚动得非常快,则acc会失去同步,而并发性为1始终会订购数据.