当使用 rxjs 6 在 angular 6 中有新请求时,如何取消正在进行的 HTTP 请求

Kri*_*hna 5 javascript rxjs typescript angular

我在下面RxnsSearchService,并RxnsSearchHitCountService在我的应用程序,两个HTTP服务。

使用forkJoin下面的代码处理两个请求。

constructor(
  private rxnsSearchService: RxnsSearchService,
  private rxnsSearchHitCountService: RxnsSearchHitCountService
) { }
const rxnsObservable: Observable<Array<any>> = this.rxnsSearchService.getReactions(this.searchParams, filters);
const headCountObservable: Observable<number> = this.rxnsSearchHitCountService.getHitCount(this.searchParams, filters);
forkJoin([rxnsObservable, headCountObservable]).pipe().subscribe((results) => { //handling results 
},
  error => {
    console.log(error);
  });
Run Code Online (Sandbox Code Playgroud)

每当有新请求出现时,我都想取消正在进行的旧请求。任何人都可以帮助我,使其解决吗?

export class RxnsSearchService {
  sub: Subject<any> = new Subject();
  constructor(private httpClient: HttpClient) {}

  getReactions(params: Params, offset: number, perPage: number, filters: any) {
    const body = {
      filters: filters,
      query: params.query
    };
     return this.httpClient.post(environment.rxnsSearch, body).pipe(
      map((response: Array<any>) => {
        return response;
      }),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    );
  }
}
Run Code Online (Sandbox Code Playgroud)

export class RxnsSearchHitCountService {
  constructor(private httpClient: HttpClient) {}

  getHitCount(params: Params, filters: any) {
    const body = {
      filters: filters,
      query: params.query,
    };
    return this.httpClient.post(environment.rxnsSearchHitCount, body).pipe(
      map((response: number) => {
        return response;
      }),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    );
  }
}
Run Code Online (Sandbox Code Playgroud)

Ing*_*ürk 6

我将通过一个简化的示例介绍如何执行此操作的一般方法。假设我们目前有这个:

public getReactions() {
  this.http.get(…)
    .subscribe(reactions => this.reactions = reactions);
}
Run Code Online (Sandbox Code Playgroud)

确保旧请求被取消的方法是在某些主题上发出:

private reactionsTrigger$ = new Subject<void>();

public getReactions() {
  this.reactionsTrigger$.next();
}
Run Code Online (Sandbox Code Playgroud)

现在我们有一个 observable 表示触发新请求的事件流。您现在可以实现OnInit如下内容:

public ngOnInit() {
  this.reactionsTrigger$.pipe(
    // Use this line if you want to load reactions once initially
    // Otherwise, just remove it
    startWith(undefined),

    // We switchMap the trigger stream to the request
    // Due to how switchMap works, if a previous request is still
    // running, it will be cancelled.
    switchMap(() => this.http.get(…)),

    // We have to remember to ensure that we'll unsubscribe from
    // this when the component is destroyed
    takeUntil(this.destroy$),
  ).subscribe(reactions => this.reactions = reactions);
}

// Just in case you're unfamiliar with it, this is how you create
// an observable for when the component is destroyed. This helps
// us to unsubscribe properly in the code above
private destroy$ = new Subject<void>();
public ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}
Run Code Online (Sandbox Code Playgroud)

线

    switchMap(() => this.http.get(…)),
Run Code Online (Sandbox Code Playgroud)

在您的情况下,实际上可能会将事件切换到forkJoin

    switchMap(() => forkJoin([rxnsObservable, headCountObservable])),
Run Code Online (Sandbox Code Playgroud)

如果您希望单个事件流重新触发两个请求。


Rae*_*laf 3

您可以简单地在 RxJs 中使用 debounce 运算符:

debounce(1000);
Run Code Online (Sandbox Code Playgroud)

它提供了一种在发送任何请求之前设置延迟(以毫秒为单位)的方法,

该示例仅用一个请求替换 1000 毫秒内发出的所有请求。

欲了解更多详情:

fromEvent(input, 'input').pipe(
       map((e: any) => e.target.value),
       debounceTime(500),
       distinctUntilChanged()
     ).subscribe(
       (data) => {
           this.onFilterTable({column: fieldName, data});
       }
     );
Run Code Online (Sandbox Code Playgroud)