RxJs Observable Pagination

jju*_*9jj 16 pagination rxjs

第一:这是我使用RxJs的第一个项目,我想我会通过使用它来学习.

我找到了这个答案:将分页请求转换为带有RxJs的Observable流 但是它在评论中说:

你仍然超过最大调用堆栈.大约430页返回.我认为递归可能不是最好的解决方案

我想查询Youtube Data API,结果以页面形式返回,我需要对它们进行分页.我想象这样的工作流程可以工作:1)发起呼叫2)检查响应是否有'nextPageToken'3)如果有,请对Youtube API做另一个请求4)如果没有,完成

So to do this I could Imagine the following Observables / streams:
FirstRequestStream -A-X--------------->
ResponseStream     -A-A-A-A--X-------->
RequestStream      -I-A-I-A----------->
A = Action
I = Info from upper stream
X = Termination
Run Code Online (Sandbox Code Playgroud)

(不确定这个图表是否与我制作的方式一致)

所以ResponseStream依赖于FirstRequestStream和RequestStream(使用merge函数).RequestStream依赖于ResponseStream(这被称为循环可观察的吗?)

- 这是正确的方法吗?

- "流通的观察者"是一件好事,他们甚至可能吗?(我创造了一个问题).

- 我应该先尝试其他方式吗?

- 是否有可能创建相互依赖的可观察流?

谢谢您的帮助.

Ole*_*luk 25

你使这个问题过于复杂,使用延迟运算符可以更容易地解决它.

想法是你正在创建延迟的observable(因此它将被创建并仅在订阅后开始获取数据)并将其与相同的observable连接,但是对于下一页,它也将与下一页连接,依此类推. ...... 所有这一切都可以在没有递归的情况下完成.

以下是代码的外观:

const { defer, from, concat, EMPTY, timer } = rxjs; // = require("rxjs")
const { mergeMap, take, mapTo, tap } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
function fetchPage(page=0) {
  return timer(100).pipe(
    tap(() => console.log(`-> fetched page ${page}`)),
    mapTo({
      items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
      nextPage: page + 1,
    })
  );
}

const getItems = page => defer(() => fetchPage(page)).pipe(
  mergeMap(({ items, nextPage }) => {
    const items$ = from(items);
    const next$ = nextPage ? getItems(nextPage) : EMPTY;
    return concat(items$, next$);
  })
);

// process only first 30 items, without fetching all of the data
getItems()
 .pipe(take(30))
 .subscribe(e => console.log(e));
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)


LuJ*_*aks 16

下面是使用rxjs经营我的解决方案expandreduceempty使用HttpClient的模块:

假设您的 API 响应是一个包含如下形状的对象

interface Response {
  data: items[]; // array of result items
  next: string|null; // url of next page, or null if there are no more items
}
Run Code Online (Sandbox Code Playgroud)

你可以像这样使用 expand 和 reduce

getAllResults(url) {
  return this.http.get(url).pipe(
    expand((res) => res.next ? this.http.get(res.next) : EMPTY),
    reduce((acc, res) => acc.concat(res.data), [])
  );
}
Run Code Online (Sandbox Code Playgroud)


Phi*_*Lho 9

我无耻地重用了来自 Oles Savluk 的代码片段,它的fetchPage功能很好,我应用了 Picci 链接的博客文章中解释的想法(在评论中),使用expand.

Nicholas Jamieson 关于扩展的文章

它提供了一个稍微简单的代码,在expand调用中隐藏了递归(如果需要,文章的注释显示了如何对其进行线性化)。

const { timer, EMPTY } = rxjs; // = require("rxjs")
const { concatMap, expand, mapTo, tap, toArray } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
const pageNumber = 3;
function fetchPage(page = 0) {
  return timer(1000).pipe(
    tap(() => console.log(`-> fetched page ${page}`)),
    mapTo({
      items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
      nextPage: ++page === pageNumber ? undefined : page,
    }),
  );
}

const list = fetchPage().pipe(
  expand(({ nextPage }) => nextPage ? fetchPage(nextPage) : EMPTY),
  concatMap(({ items }) => items),
  // Transforms the stream of numbers (Observable<number>)
  // to a stream with only an array of numbers (Observable<number[]>).
  // Remove if you want a stream of numbers, not waiting for all requests to complete.
  toArray(),
);

list.subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)