Rxjs解析可观察和合并结果的数组

Gil*_*niv 6 observable rxjs ngrx redux-observable angular2-observables

我在订阅一系列可观察的问题。就我而言,我有一个ID数组,我需要从状态中获取所有ID,并将所有结果组合成一个可观察的对象,并订阅该可观察的对象。我需要我的订阅者最终将获得一系列已解析的可观测值。

另外,我还需要保持此订阅的开放状态,因此,如果我的内在可观察的更改之一将通知我的订阅者。

这是我的代码:

getTransactionsByIDs(transactionsIDs){
return Observable.of(transactionIDs
  .map(transactionID => this.getTransactionByID(transactionID)));
}

this.transactionsService.getTransactionsByIDs(transactionsIDs)
.subscribe(transactions=>{
 ....
})
Run Code Online (Sandbox Code Playgroud)

订户函数中上述代码的结果是未解析存储的数组。

如何解决每个商店并将它们融合在一起?

我还尝试过在transactionID上使用Observable.from()将每个ID转换为一个可观察的对象,然后对其进行解析。它工作正常,但随后我的订户分别收到每个ID的通知。如果有一种方法可以批量处理所有Observable.from()结果(并保持订阅打开),请告诉我。

这是我的Observable.from()的样子:

getTransactionsByIDs(transactionsIDs){
return transactionIDs
  .mergeMap(transactionID => this.getTransactionByID(transactionID));
}

this.transactionsService.getTransactionsByIDs(Observable.from(transactionsIDs))
.subscribe(transactions=>{
 ....
})
Run Code Online (Sandbox Code Playgroud)

谢谢。

jay*_*lps 9

不完全清楚您想要什么行为,但您很可能想要forkJoinzipcombineLatest

分叉连接

当所有 observable 完成时,发出一个数组,其中包含每个 observable 的最后发出的值。

压缩

订阅所有内部 observables,等待每个人发出一个值。一旦发生这种情况,将发出具有相应索引的所有值。这将一直持续到至少一个内部 observable 完成。

结合最新

当任何 observable 发出一个值时,从每个 observable 发出最新的值。


例子:

getTransactionsByIDs(transactionsIDs) {
  const transactions = transactionIDs.map(transactionID => this.getTransactionByID(transactionID));
  return Observable.forkJoin(...transactions); // change to zip or combineLatest if needed
}


this.transactionsService.getTransactionsByIDs(['1', '2', '3'])
  .subscribe(([first, second, third]) => {
    console.log({ first, second, third });
  });
Run Code Online (Sandbox Code Playgroud)


byg*_*ace 6

我想你想要的是combineLatest。它不会发出任何值,直到所有内部可观测值发出至少一个值为止。之后,每次从内部可观察对象之一发出新的信号时,它将发出所有最新信息。

这是一些阅读材料:https : //www.learnrxjs.io/operators/combination/combinelatest.html

这是一个例子:

function getTransactionByID(transactionId) {
  let count = 0;
  return Rx.Observable.of(transactionId)
    .delay(Math.random() * 4000)
    .map(x => `${x}: ${count++} `)
    .repeat();
}

function getTransactionsByIDs(transactionsIDs){
  return Rx.Observable.combineLatest(transactionsIDs.map(transactionID => getTransactionByID(transactionID)));
}

const transactionsIDs = [1,2,3];
getTransactionsByIDs(transactionsIDs)
  .take(10)
  .subscribe(x => { console.log(x); });
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

take(10)只是使示例永远不会发生的事情。