RxJS将数组映射到可观察数据并返回到数组中的普通对象

use*_*255 4 fork-join observable rxjs rxjs5 angular2-observables

我有一个对象数组,我需要将每个对象分别传递给异步方法(后面的进程用Promise处理,然后通过Observable.fromPromise(...)- 转换回Observable - 需要这种方式,因为只使用单个对象时使用相同的方法随时;进程将对象保存到数据库中).例如,这是一个对象数组:

[
  {
    "name": "John",
    ...
  },
  {
    "name": "Anna",
    ...
  },
  {
    "name": "Joe",,
    ...
  },
  {
    "name": "Alexandra",
    ...
  },
  ...
]
Run Code Online (Sandbox Code Playgroud)

现在我有一个名为insert的方法which,它将对象插入到数据库中.store数据库实例的方法返回新创建的id.最后,复制初始对象并使用其新ID进行映射:

insert(user: User): Observable<User> {
  return Observable.fromPromise(this.database.store(user)).map(
    id => {
      let storedUser = Object.assign({}, user);
      storedUser.id = id;
      return storedUser;
    }
  );
}
Run Code Online (Sandbox Code Playgroud)

这适用于我插入单个对象的情况.但是,我想添加对插入多个对象的支持,这些对象只调用单个插入的方法.目前这就是我所拥有的,但它不起作用:

insertAll(users: User[]): Observable<User[]> {
  return Observable.forkJoin(
    users.map(user => this.insert(user))
  );
}
Run Code Online (Sandbox Code Playgroud)

insertAll方法是按预期插入用户(或其他东西填充数据库的用户),但我没有得到任何回复.我正在调试正在发生的事情,似乎forkJoin只是从第一个映射用户获得响应,但其他人被忽略.订阅insertAll不做任何事情,也没有任何错误通过catch insertAll或通过订阅中的第二个参数insertAll.

所以我正在寻找一种解决方案,其中Observable(in insertAll)将以该形式向用户发回一组新对象:

[
  {
    "id": 1,
    "name": "John",
    ...
  },
  {
    "id": 2,
    "name": "Anna",
    ...
  },
  {
    "id": 3,
    "name": "Joe",,
    ...
  },
  {
    "id": 4,
    "name": "Alexandra",
    ...
  },
  ...
]
Run Code Online (Sandbox Code Playgroud)

对于任何指向正确方向的建议,我会非常高兴.提前致谢!

Dor*_*rus 7

要从数组转换为可观察数据,您可以使用Rx.Observable.from(array).

要从observable转换为array,请使用obs.toArray().请注意,这确实会返回一个数组的可观察对象,因此您仍需要将.subscribe(arr => ...)其删除.

也就是说,你的代码确实看起来是正确的.但是如果你想尝试,请编写如下代码:forkJoin from

insertAll(users: User[]): Observable<User[]> {
  return Observable.from(users)
    .mergeMap(user => this.insert(user))
    .toArray()
  );
}
Run Code Online (Sandbox Code Playgroud)

另一种更像rx的方法就是在完成时发出值,而不是等待所有这些forkJoin或类似的toArray.我们可以省略toArray上一个例子,我们得到了它:

insertAll(users: User[]): Observable<User> {
  return Observable.from(users)
    .mergeMap(user => this.insert(user))
  );
}
Run Code Online (Sandbox Code Playgroud)

正如@cartant所提到的,问题可能不在Rx中,它可能是您的数据库不支持多个连接.在这种情况下,您可以替换mergeMapwith concatMap以使Rx仅发送1个并发请求:

insertAll(users: User[]): Observable<User[]> {
  return Observable.from(users)
    .concatMap(user => this.insert(user))
    .toArray() // still optional
  );
}
Run Code Online (Sandbox Code Playgroud)