如何使用 RxJs 运算符进行递归 HTTP 调用?

Jac*_*ack 2 javascript rxjs typescript reactjs angular

我在 odder 中使用以下方法通过为每个 HTTP 调用传递pageIndex(1) 和pageSize(500) 来检索数据。

this.demoService.geList(1, 500).subscribe(data => {
    this.data = data.items;
});
Run Code Online (Sandbox Code Playgroud)

响应有一个名为的属性isMore,如果为 true,我想修改 odder 中的方法以继续 HTTP 调用isMore。我还需要合并返回的值,最后返回累加的值。

例如,假设有 5000 条记录,并且在第 10 次 HTTP 调用之前,服务将返回 true 值isMore。第 10 次 HTTP 调用后,它返回 false,然后此方法this.data用合并的 5000 条记录设置值。对于这个问题,我应该使用mergeMaporexpand或其他RxJs运算符吗?解决这个问题的正确方法是什么?

更新:我使用以下方法,但它不会合并返回的值,也不会增加 pageIndex。由于这个原因,它不起作用(我尝试进行一些更改,但无法使其工作)。

let pageIndex = 0;
this.demoService.geList(pageIndex+1, 500).pipe(
    expand((data) => {
        if(data.isComplete) {
            return of(EMPTY);
        } else {
            return this.demoService.geList(pageIndex+1, 500);
        }
    })
).subscribe((data) => {
    //your logic here
});
Run Code Online (Sandbox Code Playgroud)

更新二:

of({
    isMore : true,
    pageIndex: 0,
    items: []
  }).pipe(
    expand(data => demoService.geList(data.pageIndex+1, 100)
    .pipe(
      map(newData => ({...newData, pageIndex: data.pageIndex+1}))
    )),
    // takeWhile(data => data.isMore), //when using this, it does not work if the total record is less than 100
    takeWhile(data => (data.isMore || data.pageIndex === 1)), // when using this, it causing +1 extra HTTP call unnecessarily
    map(data => data.items),
    reduce((acc, items) => ([...acc, ...items]))
  )
  .subscribe(data => {
    this.data = data;
  });  
Run Code Online (Sandbox Code Playgroud)

更新三:

最后我通过修改 Eliseo 的方法使其工作,如下所示。但是我**我需要将其设为无效并this.data在此getData()方法中设置参数。我怎样才能做到这一点?

getData(pageIndex, pageSize) {
  return this.demoService.geList(pageIndex, pageSize).pipe(
    switchMap((data: any) => {
      if (data.isMore) {
        return this.getData(pageIndex+1, pageSize).pipe(
          map((res: any) => ({ items: [...data.items, ...res.items] }))
        );
      }
      return of(data);
    })
  );
}
Run Code Online (Sandbox Code Playgroud)

我想将以下订阅部分合并到此方法中,但由于一些错误而无法合并,例如“类型‘void’上不存在属性‘管道’”。

.subscribe((res: any) => {
    this.data = res;
});
Run Code Online (Sandbox Code Playgroud)

Eli*_*seo 6

getData(pageIndex, pageSize) {
    return this.demoService.getList(pageIndex, pageSize).pipe(
      switchMap((data: any) => {
        if (!data.isCompleted) {
          return this.getData(pageIndex+1, pageSize).pipe(
            map((res: any) => ({ data: [...data.data, ...res.data] }))
          );
        }
        return of(data);
      })
    );
  }
Run Code Online (Sandbox Code Playgroud)

stackblitz 注意:我按照 @mbojko 建议更新了 pasing 作为参数 pageIndex+1 -在我写之前pageIndex++

更新2

使用展开运算符,我们需要考虑到我们需要向“递归函数”提供一个带有 pageIndex 的对象(这在我们的调用中是必需的),为此,当我们需要this.demoService.getList(data.pageIndex+1,10)“转换结果”时,添加一个新属性“pageIndex”。为此我们使用“地图”

  getData() {
    //see that initial we create "on fly" an object with properties: pageIndex,data and isCompleted
    return of({
      pageIndex:1,
      data:[],
      isCompleted:false
    }).pipe(
      expand((data: any) => {
        return this.demoService.getList(data.pageIndex,10).pipe(
            //here we use map to create "on fly" and object
            map((x:any)=>({
              pageIndex:data.pageIndex+1, //<--pageIndex the pageIndex +1
              data:[...data.data,...x.data], //<--we concatenate the data using spread operator
              isCompleted:x.isCompleted}))  //<--isCompleted the value
        )
      }),
      takeWhile((data: any) => !data.isCompleted,true), //<--a take while
            //IMPORTANT, use "true" to take account the last call also
      map(res=>res.data)  //finally is we only want the "data" 
                          //we use map to return only this property
    )
  }
Run Code Online (Sandbox Code Playgroud)

好吧,我们可以做一个这样的函数:

  getData() {
    of({pageIndex:1,data:[],isCompleted:false}).pipe(
      expand((data: any) => {
        return this.demoService.getList(data.pageIndex,10).pipe(
            tap(x=>{console.log(x)}),
            map((x:any)=>({
              pageIndex:data.pageIndex+1,
              data:[...data.data,...x.data],
              isComplete:x.isComplete}))
        )
      }),
      takeWhile((data: any) => !data.isComplete,true), //<--don't forget the ",true"
    ).subscribe(res=>{
       this.data=res.data
    })
  }
Run Code Online (Sandbox Code Playgroud)

请注意,在这种情况下,我们不会返回 else simple subscribe 函数并将变量 this.data 等于 res.data - 这就是我们不需要最后一个地图的原因

更新 3Mrk Sef提供

最后,如果您不希望流发出间歇性值,而只需要最终的串联数据,则可以从 中删除数据串联expand,然后reduce改为使用。

getData(pageIndex, pageSize) {
    return this.demoService.getList(pageIndex, pageSize).pipe(
      switchMap((data: any) => {
        if (!data.isCompleted) {
          return this.getData(pageIndex+1, pageSize).pipe(
            map((res: any) => ({ data: [...data.data, ...res.data] }))
          );
        }
        return of(data);
      })
    );
  }
Run Code Online (Sandbox Code Playgroud)