如何使用 RxJS 处理数百个请求而不崩溃?

Bas*_*ear 2 concatenation rxjs angular

我需要处理 400 多个员工对象,其中涉及为每个对象调用一个可观察的服务方法。

我需要知道所有员工何时已被处理,并且我需要找到一种方法来执行此操作,该方法不会导致我的客户端或服务器因同时请求而过载 - 这就是目前正在发生的情况。

我想将每个员工的处理结果收集到一个数组中,我可以从中进一步处理数据(我正在创建一个导出文件)。

我一直在尝试 RxJS concatforkjoin,但我似乎仍然有 400 多个任务同时启动。

我阅读了有关concat的页面,该页面表明处理是一次处理一个(就像 ATM 上的队列),但我不相信 - 我的浏览器再次挂起,网络请求数量建议全部-对我来说是同一时间。

如果我可以获得一次处理一名员工的代码,并在处理列表中的下一位员工之前完成,那就太好了。

processEmployees(): AsyncSubject<boolean> {
    let employeesProcessed: AsyncSubject<boolean> = new AsyncSubject<boolean>();
    let listOfProcessesToRun = [];

    this.employeeProfilesList.forEach(
        (employeeProfile: EmployeeProfile) => {
            listOfProcessesToRun.push(
                this.annualLeaveCalculationService.startProcess(employeeProfile, this.yearToProcess).delay(2000)
            );
        }
    )

    Observable.concat(listOfProcessesToRun)
        .finally(() => {
        })
        .subscribe(
        (calculationResults: AnnualLeaveCalculationResults) => {
            this.calculationResults.push(calculationResults);
            employeesProcessed.complete();
        },
        error => {
        });

    return employeesProcessed;
}
Run Code Online (Sandbox Code Playgroud)

我尝试过使用Observable.forkjoin来处理一系列可观察量,但我相信请求是并行处理的,这导致我的机器挂起,因为有大约 400 名员工需要处理。

我考虑过嵌套解决方案,例如

processemployee(X) {
    doServiceCall(this.employees[x])
    .subscribe( response => {
            processemployee(++x); // with a check to detect number of loops
        }
    );
}
Run Code Online (Sandbox Code Playgroud)

但我不确定这是一个很好的模式。

Mar*_*ten 5

您可以使用.mergeMap重载,它接受concurreny并将其设置为服务的正常值(例如 20req 并发)。这样您就可以优化最大吞吐量,而不会淹没您的服务器。