RXJS重复查询直到满足条件?

Cha*_*lie 1 rxjs angular

我使用的API只返回有限数量的结果,比如100.

我想重复一个查询,直到返回结果集< 100,这意味着我得到了最后的结果.

所以它会是这样的:

  1. 进行查询
  2. 结果集是否小于限制?如果是,请再次执行并附加结果.
  3. 一旦结果集小于限制,则发出最终结果.

ebr*_*ult 9

我们还可以使用repeatWhen运算符(类似于retryWhen但适用于完成而不是错误):

this.queryData().pipe(
    repeatWhen(obs => obs),
    filter(data => this.checkLimit()),
    take(1)
).subscribe(result => console.log(result));
Run Code Online (Sandbox Code Playgroud)

注意:take(1)需要停止repeatWhen循环。

如果我们需要重试之间的延迟,我们可以这样做:

repeatWhen(obs => obs.pipe(delay(1000)))
Run Code Online (Sandbox Code Playgroud)

  • 我认为你可以使用“find(checkLimit)”。它基本上与“filter()”和“take(1)”组合在一起相同。 (2认同)

Sur*_*iya 8

You can also use RXJS Interval with TakeWhile operator. Here is the sample code

In Component:

return Observable
  .interval(250)
  .flatMap(() => this.getQueryData())
  .takeWhile(data => this.checklimit(data))
  .subscribe(result => console.log(result);

getQueryData(){
   // HTTTP API call 
}

checklimit(){
  // return true/false
}
Run Code Online (Sandbox Code Playgroud)

  • 不幸的是,interval() 方法没有考虑响应时间。我们每 250 毫秒重新发送一次 http 请求。但响应时间本身可能会长于250ms。完成上一通话与开始新通话之间没有关联。 (2认同)

Zah*_*hiC 7

您可以使用expand运算符进行简单的"条件重复"行为.

仅作为示例,我更改了查询以返回数字,而不是结果集.以下内容将继续查询,直到检索到的数字小于100

const { defer, empty } = rxjs;
const { expand, toArray} = rxjs.operators;

const query$ = defer(async () =>  Math.floor(Math.random()*1000));
   
query$
  .pipe(
    expand(result => result < 100 ? empty() : query$),
    toArray()
  )
  .subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)


Ren*_*ler 5

 stop$: Subject = new Subject();

 query$.pipe(takeUntil(this.stop$)).subscribe( result => {
    if(result < limit) 
       this.stop$.next();
       this.stop$.complete();
     }
   else {
      process result, i.e append somewhere...
   }
 });
Run Code Online (Sandbox Code Playgroud)

注意他是 RxJs 6 语法。为了可读性,您可以在方法中提取订阅逻辑。

也许在这种情况下使用 takeWhile 更容易:

doQuery: boolean = true;

 query$.pipe(takeWhile(()=> this.doQuery).subscribe( result => {
    if(result < limit) 
       this.doQuery = false;
     }
   else {
      process result, i.e append somewhere...
   }
 });
Run Code Online (Sandbox Code Playgroud)