我需要通过检查某个Web服务的条目来过滤observable发出的条目.正常的observable.filter运算符在这里不合适,因为它期望谓词函数同步返回判定,但在这种情况下,只能异步检索判定.
我可以通过以下代码进行转换,但我想知道是否有一些更好的运算符我可以用于这种情况.
someObservable.flatmap(function(entry) {
return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
return {
verdict: verdict,
entry: entry
};
});
}).filter(function(obj) {
return obj.verdict === true;
}).map(function(obj) {
return obj.entry;
});
Run Code Online (Sandbox Code Playgroud)
Bra*_*don 10
以下是使用现有运算符实现此类运算符的方法.你需要考虑一个障碍.由于您的过滤操作是异步的,因此新项目的到达速度可能比过滤操作可以处理的速度快.在这种情况下会发生什么?是否要按顺序运行过滤器并确保维护项目的顺序?您想要并行运行过滤器并接受您的商品可能以不同的顺序出现吗?
以下是运营商的2个版本
// runs the filters in parallel (order not guaranteed)
// predicate should return an Observable
Rx.Observable.prototype.flatFilter = function (predicate) {
return this.flatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
// runs the filters sequentially (order preserved)
// predicate should return an Observable
Rx.Observable.prototype.concatFilter = function (predicate) {
return this.concatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
Run Code Online (Sandbox Code Playgroud)
用法:
var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc);
someObservable.concatFilter(predicate).subscribe(...);
Run Code Online (Sandbox Code Playgroud)
从RxJS 6.0版开始,我们有了管道运算符,而不是可观察的原型方法链。
因此,我将该请求的原始代码更新为RxJS 6管道样式,该样式通过接受的答案中的信息得到了改进。
我现在将此代码重构为一个npm包。
https://www.npmjs.com/package/filter-async-rxjs-pipe
使用concatMap的串行变体已经可以正常工作,使用flatMap的并行变体当前似乎无法并行运行。但是由于我需要concatMap版本,因此我目前拥有所有需要的东西。如果有人对如何正确编写并行版本有想法,请在连接的Git存储库中添加一个问题。:)
注意
由于我只需要传递返回Promise的谓词函数,因此我将Promise到Observable的转换直接写入filterAsync方法。如果需要Observable作为过滤器输入,请随时调整代码。
export function filterAsync<T>(predicate: (value: T, index: number) => Promise<boolean>): MonoTypeOperatorFunction<T> {
let count = 0;
return pipe(
// Convert the predicate Promise<boolean> to an observable (which resolves the promise,
// Then combine the boolean result of the promise with the input data to a container object
concatMap((data: T) => {
return from(predicate(data, count++))
.pipe(map((isValid) => ({filterResult: isValid, entry: data})));
}),
// Filter the container object synchronously for the value in each data container object
filter(data => data.filterResult === true),
// remove the data container object from the observable chain
map(data => data.entry)
);
}
Run Code Online (Sandbox Code Playgroud)
这是具有完整ts文件代码的要点,包括导入:https :
//gist.github.com/bjesuiter/288326f9822e0bc82389976f8da66dd8#file-filter-async-ts
| 归档时间: |
|
| 查看次数: |
2886 次 |
| 最近记录: |