Angular 2/4 & RxJS - forEach 内的订阅 - 确保在所有 Observable 完成之前不会继续流控制

Ste*_*ace 4 ajax observable rxjs angular-http angular

目标是遍历一组 ID,为每个 ID 进行一次 HTTP 调用。对于每个 ID,我使用了一个带有 get() 方法的服务,该方法返回一个 Observable。每次调用 get() 方法时,我都会订阅返回的 Observable 并尝试将结果推送到一个数组中,该数组最终将传递给新操作的不同方法。

相关服务方式:

public get(departmentId: number): Observable<IDepartmentModel> {
    return super.get<IDepartmentModel>(departmentId);
}
Run Code Online (Sandbox Code Playgroud)

注意:超类正在利用 Angular Http,它经过充分测试并确认可以正常工作。逻辑的问题不在这里......

相关组件方法: 注意在 forEach 中多次调用的 DepartmentService.get() 调用。

 setInitialDepartmentsAssignedGridData(): void {
    this.settingsForDropdownSelectedCompanyId = this.userForm.get('defaultCompany').get('defaultCompanyId').value;

    let departments: IDepartmentModel[] = [];

    this.userService.user.getValue() //confirmed: valid user is being pulled back from the userService (logic is fine here..)
        .userCompanies.find(comp => comp.companyId === this.settingsForDropdownSelectedCompanyId) // getting a valid match here (logic is fine here..)
        .departmentIds.forEach(deptId => this.departmentService.get(deptId).first().subscribe(dept => { // getting a valid department back here (logic is fine here...)
            departments.push(dept); // HERE LIES THE PROBLEM
    }));

    this.setDepartmentsAssignedRowData(departments);
}

 setDepartmentsAssignedRowData(departments: IDepartmentModel[]): void {
    console.log('setDeptAssignedRowData called'); // confirmed: method is getting called...
    console.log(departments); // confirmed: fully-composed collection of departments logged to the console...

    departments.forEach(dept => {
        console.log(dept);
    }); // Y U NO WORK!?

    departments.map((department) => {
        console.log(department); // Y U NO WORK?
        this.departmentAssignedRowData.push({
            departmentId: department.id,
            departmentName: department.name
        });
    });

    this.departmentAssignedGridOptions.api.setRowData(this.departmentAssignedRowData);
}
Run Code Online (Sandbox Code Playgroud)

问题是,虽然记录到控制台的是一个完全组合的部门对象数组,但它并不是真正的“那里”;传递给的setDepartmentsAssignedRowData是一个空数组。

我确信发生的事情是在将部门数组传递给第二种方法之前异步操作没有完成。我在网上阅读的一些内容说要使用forkJoin,但我看不出在这种情况下会是什么样子。我也读过concatMap可能有用,但同样,在这种情况下,我不确定如何使它起作用......

在这种情况下,我如何利用 RxJS 来确保预期的、完全组合的部门数组真正准备好传递

感谢您提供的任何见解。非常感谢帮助!

LLa*_*Lai 5

你是对的,你需要 forkJoin

let observableArray = this.userService.user.getValue()
    .userCompanies.find(comp => comp.companyId === this.settingsForDropdownSelectedCompanyId)
    .departmentIds.map(deptId => this.departmentService.get(deptId)) // map is building out an array of observables
Run Code Online (Sandbox Code Playgroud)

这将是您想要并行制作的一系列 http 请求可观察对象。现在您可以将此数组传递给forkJoin.

Observable.forkJoin(...observableArray)
Run Code Online (Sandbox Code Playgroud)

forkJoin 的返回将是来自 的结果数组observableArrayforkJoin在所有 observableobservableArray完成之前,不会向序列中的下一个操作符发出信号(所以当所有 http 请求都完成时)

所以总而言之,代码将是

let observableArray = this.userService.user.getValue()
    .userCompanies.find(comp => comp.companyId === this.settingsForDropdownSelectedCompanyId)
    .departmentIds.map(deptId => this.departmentService.get(deptId));

Observable.forkJoin(...observableArray).subscribe(res => {
    // res = [resId0, resId1, resId2, ..., resIdX];
});
Run Code Online (Sandbox Code Playgroud)

您提到将结果传递给另一个操作员。如果该运算符是另一个 http 请求,您在其中传递了一组数据(来自forkJoin),那么您可以使用该flatMap运算符。

Observable.forkJoin(...observableArray)
    .flatMap(res => {
        return this.otherApi(res);
    })
    .subscribe(res => {
        // res is the result of the otherApi call
    });
Run Code Online (Sandbox Code Playgroud)

flatMap将您的 api 请求链接在一起。所以总的来说,正在发生的事情是

  1. 并行运行可观察的数组
  2. 完成后,运行第二个 api ( otherApi)