Effect 中的可取消 NGRX 服务轮询

Jas*_*erJ 5 ngrx ngrx-effects angular ngrx-store

如何轮询具有停止轮询直到服务的返回值满足条件或总持续时间超过超时阈值的效果的服务?

例如:后端系统正在生成资源,前端应用程序可以通过调用返回布尔值的 REST api 调用来检查该资源是否可用。在我的 NGRX 应用程序中,我想每 200 毫秒轮询一次此 api 调用,直到此 api 调用返回布尔值 true 或总轮询持续时间超过 10000 毫秒的阈值。

以下代码示例显示了一种轮询机制,但是,该轮询无法取消,也没有超时。这是怎么做的?

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/switchMap';

@Effect()
pollEffect$: Observable<Action> = this.actions$
  .ofType(tasksActions.ActionTypes.START_POLLING)
  .switchMap(() => Observable.timer(0, 200)
    .switchMap(() => this.myBackendService.getAvailability().map(response => 
       return taskActions.ActionTypes.UpdateValue(response))
    )
  );
Run Code Online (Sandbox Code Playgroud)

小智 7

有完全相同的问题,我无法从互联网上找到答案。我花了一些时间才让它发挥作用。这就是我所做的。我注意到我把“takeUntil(this.pollingUntil$)”放在哪里很重要,我把它放在顶部,因为我需要用最新数据更新存储。如果我把它放在底部(注释行),那么轮询被取消,我无法使用最新数据更新存储。

private readonly pollingIntervalMs = 5000;
private readonly maxPollingMs = 600000; // 10 sec

private pollingUntil$: Subject<boolean> = new Subject<boolean>();

@Effect()
pollDb$: Observable<Action> = this.actions$.pipe(
  ofType(infraActions.InfraActionTypes.PollDb),
  switchMap(pollAction => interval(this.pollingIntervalMs).pipe(
    takeUntil(timer(this.maxPollingMs)),
    takeUntil(this.pollingUntil$),
    mapTo(pollAction),
    switchMap(
      (action: infraActions.PollDb) => this.dbService.get().pipe(
        map((dbInfo: DbInfo) => {
          if (meet condition) {
            this.pollingUntil$.next(true);
          }
          return dbInfo;
        })
      )
    ),
//    takeUntil(this.pollingUntil$),
    map((dbInfo: DbInfo) => {
      return new infraActions.PollDbSuccess(dbInfo);
    }),
    catchError(err => of(new infraActions.LoadDbFail(err)))
  )),
);
Run Code Online (Sandbox Code Playgroud)


San*_*wal -1

您可以根据需要使用过滤器运算符来过滤数据。当条件为真时,过滤器运算符将发出数据。

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/switchMap';
import 'rxjs/add/operator/filter';

@Effect()
pollEffect$: Observable<Action> = this.actions$
  .ofType(tasksActions.ActionTypes.START_POLLING)
  .switchMap(() => Observable.timer(0, 200)
    .switchMap(() => this.myBackendService.getAvailability().filter(response => //condition).map(response => 
       return taskActions.ActionTypes.UpdateValue(response))
    )
  );
Run Code Online (Sandbox Code Playgroud)