Eva*_*oll 3 queue parallel-processing rxjs concurrent-queue
假设我想下载 10,000 个文件。我可以轻松地构建一个包含这 10,000 个文件的队列(如果其中任何一个可以做得更好,我很乐意接受建议),
import request from 'request-promise-native';
import {from} from 'rxjs';
let reqs = [];
for ( let i = 0; i < 10000; i++ ) {
reqs.push(
from(request(`http://bleh.com/${i}`))
)
};
Run Code Online (Sandbox Code Playgroud)
现在我有一个 Rx.JS observable 数组,我是从代表我的队列的承诺中创建的。现在对于我想要的行为,我想发出
我可以为这个问题创建一个解决方案,但鉴于我从未使用过的Rxjs queue 之类的东西,我想知道最正确的 Rxjs 方法是什么。
听起来你想要一个forkJoin
支持调用者指定的最大并发订阅数的等价物。
可以重新实现forkJoin
usingmergeMap
和公开concurrent
参数,如下所示:
import { from, Observable } from "rxjs";
import { last, map, mergeMap, toArray } from "rxjs/operators";
export function forkJoinConcurrent<T>(
observables: Observable<T>[],
concurrent: number
): Observable<T[]> {
// Convert the array of observables to a higher-order observable:
return from(observables).pipe(
// Merge each of the observables in the higher-order observable
// into a single stream:
mergeMap((observable, observableIndex) => observable.pipe(
// Like forkJoin, we're interested only in the last value:
last(),
// Combine the value with the index so that the stream of merged
// values - which could be in any order - can be sorted to match
// the order of the source observables:
map(value => ({ index: observableIndex, value }))
), concurrent),
// Convert the stream of last values to an array:
toArray(),
// Sort the array of value/index pairs by index - so the value
// indices correspond to the source observable indices and then
// map the pair to the value:
map(pairs => pairs.sort((l, r) => l.index - r.index).map(pair => pair.value))
);
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1268 次 |
最近记录: |