第一:这是我使用RxJs的第一个项目,我想我会通过使用它来学习.
我找到了这个答案:将分页请求转换为带有RxJs的Observable流 但是它在评论中说:
你仍然超过最大调用堆栈.大约430页返回.我认为递归可能不是最好的解决方案
我想查询Youtube Data API,结果以页面形式返回,我需要对它们进行分页.我想象这样的工作流程可以工作:1)发起呼叫2)检查响应是否有'nextPageToken'3)如果有,请对Youtube API做另一个请求4)如果没有,完成
So to do this I could Imagine the following Observables / streams:
FirstRequestStream -A-X--------------->
ResponseStream -A-A-A-A--X-------->
RequestStream -I-A-I-A----------->
A = Action
I = Info from upper stream
X = Termination
Run Code Online (Sandbox Code Playgroud)
(不确定这个图表是否与我制作的方式一致)
所以ResponseStream依赖于FirstRequestStream和RequestStream(使用merge函数).RequestStream依赖于ResponseStream(这被称为循环可观察的吗?)
- 这是正确的方法吗?
- "流通的观察者"是一件好事,他们甚至可能吗?(我创造了一个问题).
- 我应该先尝试其他方式吗?
- 是否有可能创建相互依赖的可观察流?
谢谢您的帮助.
Ole*_*luk 25
你使这个问题过于复杂,使用延迟运算符可以更容易地解决它.
想法是你正在创建延迟的observable(因此它将被创建并仅在订阅后开始获取数据)并将其与相同的observable连接,但是对于下一页,它也将与下一页连接,依此类推. ...... 所有这一切都可以在没有递归的情况下完成.
以下是代码的外观:
const { defer, from, concat, EMPTY, timer } = rxjs; // = require("rxjs")
const { mergeMap, take, mapTo, tap } = rxjs.operators; // = require("rxjs/operators")
// simulate network request
function fetchPage(page=0) {
return timer(100).pipe(
tap(() => console.log(`-> fetched page ${page}`)),
mapTo({
items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
nextPage: page + 1,
})
);
}
const getItems = page => defer(() => fetchPage(page)).pipe(
mergeMap(({ items, nextPage }) => {
const items$ = from(items);
const next$ = nextPage ? getItems(nextPage) : EMPTY;
return concat(items$, next$);
})
);
// process only first 30 items, without fetching all of the data
getItems()
.pipe(take(30))
.subscribe(e => console.log(e));Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>Run Code Online (Sandbox Code Playgroud)
LuJ*_*aks 16
下面是使用rxjs经营我的解决方案expand,reduce并empty使用HttpClient的模块:
假设您的 API 响应是一个包含如下形状的对象
interface Response {
data: items[]; // array of result items
next: string|null; // url of next page, or null if there are no more items
}
Run Code Online (Sandbox Code Playgroud)
你可以像这样使用 expand 和 reduce
getAllResults(url) {
return this.http.get(url).pipe(
expand((res) => res.next ? this.http.get(res.next) : EMPTY),
reduce((acc, res) => acc.concat(res.data), [])
);
}
Run Code Online (Sandbox Code Playgroud)
我无耻地重用了来自 Oles Savluk 的代码片段,它的fetchPage功能很好,我应用了 Picci 链接的博客文章中解释的想法(在评论中),使用expand.
它提供了一个稍微简单的代码,在expand调用中隐藏了递归(如果需要,文章的注释显示了如何对其进行线性化)。
const { timer, EMPTY } = rxjs; // = require("rxjs")
const { concatMap, expand, mapTo, tap, toArray } = rxjs.operators; // = require("rxjs/operators")
// simulate network request
const pageNumber = 3;
function fetchPage(page = 0) {
return timer(1000).pipe(
tap(() => console.log(`-> fetched page ${page}`)),
mapTo({
items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
nextPage: ++page === pageNumber ? undefined : page,
}),
);
}
const list = fetchPage().pipe(
expand(({ nextPage }) => nextPage ? fetchPage(nextPage) : EMPTY),
concatMap(({ items }) => items),
// Transforms the stream of numbers (Observable<number>)
// to a stream with only an array of numbers (Observable<number[]>).
// Remove if you want a stream of numbers, not waiting for all requests to complete.
toArray(),
);
list.subscribe(console.log);Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>Run Code Online (Sandbox Code Playgroud)