Observable.Where与异步谓词

Moj*_*ter 14 c# system.reactive async-await

有没有一种方便的方法可以使用异步函数作为Whereobservable上运算符的谓词?

例如,如果我有一个很好的整洁但可能长时间运行的函数定义如下:

Task<int> Rank(object item);
Run Code Online (Sandbox Code Playgroud)

是否有将其传递给Where异步执行并保持异步执行的技巧?如:

myObservable.Where(async item => (await Rank(item)) > 5)
Run Code Online (Sandbox Code Playgroud)

在过去,当我需要这样做时,我已经使用SelectMany并将这些结果与原始值一起投影到新类型中,然后根据它进行过滤.

myObservable.SelectMany(async item => new 
  {
    ShouldInclude = (await Rank(item)) > 5,
    Item = item
  })
  .Where(o => o.ShouldInclude)
  .Select(o => o.Item);
Run Code Online (Sandbox Code Playgroud)

我认为那是非常难以理解的,但我觉得必须有一个更清洁的方式.

svi*_*ick 13

我认为这非常难以理解

是的,但您可以通过将其封装到辅助方法中来解决这个问题.如果你调用它Where,你将获得你想要的语法:

public static IObservable<T> Where<T>(
    this IObservable<T> source, Func<T, Task<bool>> predicate)
{
    return source.SelectMany(async item => new 
        {
            ShouldInclude = await predicate(item),
            Item = item
        })
        .Where(x => x.ShouldInclude)
        .Select(x => x.Item);
}
Run Code Online (Sandbox Code Playgroud)