如何在异步谓词中使用"Where"?

Sam*_*Sam 14 .net linq async-await c#-5.0

我有一个像这样的异步谓词方法:

private async Task<bool> MeetsCriteria(Uri address)
{
    //Do something involving awaiting an HTTP request.
}
Run Code Online (Sandbox Code Playgroud)

说我有一个Uris 的集合:

var addresses = new[]
{
    new Uri("http://www.google.com/"),
    new Uri("http://www.stackoverflow.com/") //etc.
};
Run Code Online (Sandbox Code Playgroud)

我想过滤addresses使用MeetsCriteria.我想异步这样做; 我希望多次调用谓词以异步方式运行,然后我想等待所有这些调用完成并生成过滤结果集.不幸的是,LINQ似乎并不支持异步谓词,所以这样的事情工作:

var filteredAddresses = addresses.Where(MeetsCriteria);
Run Code Online (Sandbox Code Playgroud)

有没有一个类似的方便的方法来做到这一点?

jul*_*gon 17

考虑到框架的新版本和界面的采用IAsyncEnumerable<T>,我不会再在这里建议任何其他高度自定义的答案,因为它们基本上是不必要的。

LINQ 的异步版本可通过NuGet 包获得System.Linq.Async

这是进行异步检查的语法:

var filteredAddresses = addresses
    .ToAsyncEnumerable()
    .WhereAwait(async x => await MeetsCriteria(x));
Run Code Online (Sandbox Code Playgroud)

filteredAddresses类型为IAsyncEnumerable<int>,可以是:

  • 物化为ToListAsync,FirstAsync
  • 迭代与await foreach

要获得与以前相同的效果并允许使用方法组进行调用,您可以将MeetsCriteriato的返回类型更改为ValueTask

private async ValueTask<bool> MeetsCriteria(Uri address)
{
    //Do something involving awaiting an HTTP request.
}

...

var filteredAddresses = addresses
    .ToAsyncEnumerable()
    .WhereAwait(MeetsCriteria);
Run Code Online (Sandbox Code Playgroud)

不过,我不建议ValueTask仅使用它来保存几个字符,因为它应该进行基准测试并出于性能/内存原因而使用。


svi*_*ick 9

我认为在框架中没有这样的原因之一是存在许多可能的变化,并且在某些情况下每个选择都是正确的:

  • 谓词应该并行还是串行执行?
    • 如果它们并行执行,它们是否应该立即执行,还是应该限制并行度?
    • 如果它们并行执行,结果应该与原始集合的顺序,完成顺序还是未定义的顺序相同?
      • 如果它们应该按完成顺序返回,是否应该有某种方式(异步)获得完成后的结果?(这需要将返回类型更改为Task<IEnumerable<T>>其他内容.)

你说你希望谓词并行执行.在这种情况下,最简单的选择是一次性执行它们并按完成顺序返回它们:

static async Task<IEnumerable<T>> Where<T>(
    this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    var results = new ConcurrentQueue<T>();
    var tasks = source.Select(
        async x =>
        {
            if (await predicate(x))
                results.Enqueue(x);
        });
    await Task.WhenAll(tasks);
    return results;
}
Run Code Online (Sandbox Code Playgroud)

然后你可以像这样使用它:

var filteredAddresses = await addresses.Where(MeetsCriteria);
Run Code Online (Sandbox Code Playgroud)


Luc*_*hik 8

第一种方法:一个接一个地预先发出所有请求,然后等待所有请求返回,然后过滤结果.(svick的代码也做了这个,但是我在这里没有中间的ConcurrentQueue).

// First approach: massive fan-out
var tasks = addresses.Select(async a => new { A = a, C = await MeetsCriteriaAsync(a) });
var addressesAndCriteria = await Task.WhenAll(tasks);
var filteredAddresses = addressAndCriteria.Where(ac => ac.C).Select(ac => ac.A);
Run Code Online (Sandbox Code Playgroud)

第二种方法:一个接一个地做请求.这将花费更长的时间,但它将确保不会因为大量的请求而破坏Web服务(假设MeetsCriteriaAsync发送到Web服务......)

// Second approach: one by one
var filteredAddresses = new List<Uri>();
foreach (var a in filteredAddresses)
{
  if (await MeetsCriteriaAsync(a)) filteredAddresses.Add(a);
}
Run Code Online (Sandbox Code Playgroud)

第三种方法:至于第二种方法,但使用假设的C#8特征"异步流".C#8尚未推出,异步流尚未设计,但我们可以梦想!IAsyncEnumerable类型已经存在于RX中,并且希望它们会为它添加更多的组合器.关于IAsyncEnumerable的好处是,我们可以在它们到来时立即开始使用前几个过滤后的地址,而不是等待首先过滤所有内容.

// Third approach: ???
IEnumerable<Uri> addresses = {...};
IAsyncEnumerable<Uri> filteredAddresses = addresses.WhereAsync(MeetsCriteriaAsync);
Run Code Online (Sandbox Code Playgroud)

第四种方法:也许我们不想同时对所有请求进行网络服务,但我们很乐意一次发出多个请求.也许我们做了实验,发现"一次三个"是一个幸福的媒介.注意:此代码假定单线程执行上下文,例如UI编程或ASP.NET.如果它在多线程执行上下文中运行,那么它需要ConcurrentQueue和ConcurrentList.

// Fourth approach: throttle to three-at-a-time requests
var addresses = new Queue<Uri>(...);
var filteredAddresses = new List<Uri>();
var worker1 = FilterAsync(addresses, filteredAddresses);
var worker2 = FilterAsync(addresses, filteredAddresses);
var worker3 = FilterAsync(addresses, filteredAddresses);
await Task.WhenAll(worker1, worker2, worker3);

async Task FilterAsync(Queue<Uri> q, List<Uri> r)
{
  while (q.Count > 0)
  {
    var item = q.Dequeue();
    if (await MeetsCriteriaAsync(item)) r.Add(item);
  }
}
Run Code Online (Sandbox Code Playgroud)

使用TPL数据流库的第四种方法也有办法.


sht*_*se8 6

我认为这比不使用任何并发队列的公认答案更简单。

public static async Task<IEnumerable<T>> Where<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    var results = await Task.WhenAll(source.Select(async x => (x, await predicate(x))));
    return results.Where(x => x.Item2).Select(x => x.Item1);
}
Run Code Online (Sandbox Code Playgroud)


Afs*_*var 6

我会使用下面的方法而不是使用ConcurrentBagConcurrentQueue

public static async IAsyncEnumerable<T> WhereAsync<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    foreach(var item in source)
    {
        if(await (predicate(item)))
        {
            yield return item;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

例如

    var result =  numbers.WhereAsync(async x =>
                                               await IsEvenAsync(x));
    await foreach (var x in result)
    {
        Console.Write($"{x},");
    }
Run Code Online (Sandbox Code Playgroud)