在linq select中异步等待

Ale*_*rck 147 c# linq asynchronous

我需要修改现有程序,它包含以下代码:

var inputs = events.Select(async ev => await ProcessEventAsync(ev))
                   .Select(t => t.Result)
                   .Where(i => i != null)
                   .ToList();
Run Code Online (Sandbox Code Playgroud)

但这对我来说似乎很奇怪,首先是使用asyncawait选择.根据Stephen Cleary的回答,我应该能够放弃这些.

然后第二个Select选择结果.这是不是意味着任务根本不是异步的并且是同步执行的(没有任何努力),或者异步执行任务,何时完成查询的其余部分?

我应该根据Stephen Cleary的另一个答案写下面的代码如下:

var tasks = await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev)));
var inputs = tasks.Where(result => result != null).ToList();
Run Code Online (Sandbox Code Playgroud)

这是完全一样的吗?

var inputs = (await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev))))
                                       .Where(result => result != null).ToList();
Run Code Online (Sandbox Code Playgroud)

当我正在研究这个项目时,我想更改第一个代码示例,但我不太热衷于更改(显然工作)异步代码.也许我只是担心什么都没有,所有3个代码示例完全相同?

ProcessEventsAsync如下所示:

async Task<InputResult> ProcessEventAsync(InputEvent ev) {...}
Run Code Online (Sandbox Code Playgroud)

Ste*_*ary 151

var inputs = events.Select(async ev => await ProcessEventAsync(ev))
                   .Select(t => t.Result)
                   .Where(i => i != null)
                   .ToList();
Run Code Online (Sandbox Code Playgroud)

但这对我来说似乎很奇怪,首先在select中使用async和await.根据Stephen Cleary的回答,我应该能够放弃这些.

呼叫Select有效.这两行基本相同:

events.Select(async ev => await ProcessEventAsync(ev))
events.Select(ev => ProcessEventAsync(ev))
Run Code Online (Sandbox Code Playgroud)

(关于如何抛出同步异常存在细微差别ProcessEventAsync,但在此代码的上下文中它根本不重要.)

然后是第二个Select选择结果.这是不是意味着任务根本不是异步的并且是同步执行的(没有任何努力),或者异步执行任务,何时完成查询的其余部分?

这意味着查询正在阻止.所以它并不是真正的异步.

打破它:

var inputs = events.Select(async ev => await ProcessEventAsync(ev))
Run Code Online (Sandbox Code Playgroud)

将首先为每个事件启动异步操作.那么这一行:

                   .Select(t => t.Result)
Run Code Online (Sandbox Code Playgroud)

将等待这些操作一次完成一个(首先它等待第一个事件的操作,然后是下一个,然后是下一个,等等).

这是我不关心的部分,因为它会阻塞并且还会包含任何异常AggregateException.

这是完全一样的吗?

var tasks = await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev)));
var inputs = tasks.Where(result => result != null).ToList();

var inputs = (await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev))))
                                       .Where(result => result != null).ToList();
Run Code Online (Sandbox Code Playgroud)

是的,这两个例子是等价的.它们都启动所有异步操作(events.Select(...)),然后异步等待所有操作以任何顺序(await Task.WhenAll(...))完成,然后继续完成其余的工作(Where...).

这两个示例都与原始代码不同.原始代码是阻塞的,并将包含异常AggregateException.

  • @SuperJMN:将`stuff.Select(x =&gt; x.Result);`替换为`await Task.WhenAll(stuff)` (2认同)
  • @DanielS:它们“本质上”是相同的。存在一些差异,例如状态机、捕获上下文、同步异常的行为。更多信息请访问 https://blog.stephencleary.com/2016/12/eliding-async-await.html (2认同)

Sid*_*dex 23

我使用了这个代码:

public static async Task<IEnumerable<TResult>> SelectAsync<TSource,TResult>(
    this IEnumerable<TSource> source, Func<TSource, Task<TResult>> method)
{
  return await Task.WhenAll(source.Select(async s => await method(s)));
}
Run Code Online (Sandbox Code Playgroud)

像这样:

var result = await sourceEnumerable.SelectAsync(async s=>await someFunction(s,other params));
Run Code Online (Sandbox Code Playgroud)

编辑:

有些人提出了并发问题,比如当你访问一个数据库时,你不能同时运行两个任务。所以这是一个更复杂的版本,它也允许特定的并发级别:

public static async Task<IEnumerable<TResult>> SelectAsync<TSource, TResult>(
    this IEnumerable<TSource> source, Func<TSource, Task<TResult>> method,
    int concurrency = int.MaxValue)
{
    var semaphore = new SemaphoreSlim(concurrency);
    try
    {
        return await Task.WhenAll(source.Select(async s =>
        {
            try
            {
                await semaphore.WaitAsync();
                return await method(s);
            }
            finally
            {
                semaphore.Release();
            }
        }));
    } finally
    {
        semaphore.Dispose();
    }
}
Run Code Online (Sandbox Code Playgroud)

如果没有参数,它的行为与上面更简单的版本完全相同。参数为 1 时,它将按顺序执行所有任务:

var result = await sourceEnumerable.SelectAsync(async s=>await someFunction(s,other params),1);
Run Code Online (Sandbox Code Playgroud)

注意:按顺序执行任务并不意味着执行会在出错时停止!

就像使用更大的并发值或未指定参数一样,所有任务都将被执行,如果其中任何一个任务失败,则生成的 AggregateException 将包含抛出的异常。

如果您想一个接一个地执行任务并在第一个失败,请尝试另一种解决方案,例如 xhafan 建议的解决方案(/sf/answers/4505442441/

  • 这是一个可爱的扩展方法。不知道为什么它被认为“更晦涩” - 它在语义上类似于同步“Select()”,因此是一个优雅的插入。 (9认同)
  • 这只是以一种更晦涩的方式包装了现有的功能imo (7认同)
  • 第一个 lambda 中的“async”和“await”是多余的。SelectAsync 方法可以简单地写为:`return wait Task.WhenAll(source.Select(method));` (5认同)
  • 确实@Nathan,为什么要有“await”呢?- ```public static Task&lt;TResult[]&gt; SelectAsync&lt;TSource,TResult&gt;(this IEnumerable&lt;TSource&gt; source, Func&lt;TSource, Task&lt;TResult&gt;&gt; method) { return Task.WhenAll(source.Select(x =&gt;)方法(x))); }``` (3认同)
  • 额外的参数是外部的,取决于我要执行的函数,它们与扩展方法的上下文无关。 (2认同)

ted*_*e24 21

现有代码正在运行,但正在阻塞该线程.

.Select(async ev => await ProcessEventAsync(ev))
Run Code Online (Sandbox Code Playgroud)

为每个事件创建一个新任务,但是

.Select(t => t.Result)
Run Code Online (Sandbox Code Playgroud)

阻止线程等待每个新任务结束.

另一方面,您的代码产生相同的结果但保持异步.

只需对您的第一个代码发表评论.这条线

var tasks = await Task.WhenAll(events...
Run Code Online (Sandbox Code Playgroud)

将生成一个单独的任务,因此该变量应以单数形式命名.

最后你的最后一个代码是相同的,但更简洁

供参考:Task.Wait/Task.WhenAll


Vit*_*kov 7

使用Linq中现有的方法,它看起来很丑陋:

var tasks = items.Select(
    async item => new
    {
        Item = item,
        IsValid = await IsValid(item)
    });
var tuples = await Task.WhenAll(tasks);
var validItems = tuples
    .Where(p => p.IsValid)
    .Select(p => p.Item)
    .ToList();
Run Code Online (Sandbox Code Playgroud)

希望以下版本的.NET能够提供更优雅的工具来处理集合的任务和任务集合.


Dar*_*ryl 6

我更喜欢将此作为扩展方法:

public static async Task<IEnumerable<T>> WhenAll<T>(this IEnumerable<Task<T>> tasks)
{
    return await Task.WhenAll(tasks);
}
Run Code Online (Sandbox Code Playgroud)

因此它可与方法链接一起使用:

var inputs = await events
  .Select(async ev => await ProcessEventAsync(ev))
  .WhenAll()
Run Code Online (Sandbox Code Playgroud)

  • @AlexanderDeck 的优点是您可以在方法链中使用它。 (4认同)
  • @Daryl 因为 `WhenAll` 返回一个计算过的列表(它不是惰性求值的),所以可以使用一个参数来使用 `Task&lt;T[]&gt;` 返回类型来表示这一点。等待时,这仍然可以使用Linq,但也表明它不懒惰。 (2认同)
  • 进一步扩展@Daryl的优点,我们可以进一步简化为: ```public static Task&lt;T[]&gt; WhenAll&lt;T&gt;(this IEnumerable&lt;Task&lt;T&gt; &gt;tasks) { return Task.WhenAll(tasks); } }``` (2认同)

KTC*_*eek 6

我想调用Select(...)但要确保它按顺序运行,因为并行运行会导致其他一些并发问题,所以我最终得到了这个。我无法调用,.Result因为它会阻塞 UI 线程。

public static class TaskExtensions
{
    public static async Task<IEnumerable<TResult>> SelectInSequenceAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> asyncSelector)
    {
        var result = new List<TResult>();
        foreach (var s in source)
        {
            result.Add(await asyncSelector(s));
        }
        
        return result;
    }
}
Run Code Online (Sandbox Code Playgroud)

用法:

var inputs = events.SelectInSequenceAsync(ev => ProcessEventAsync(ev))
                   .Where(i => i != null)
                   .ToList();
Run Code Online (Sandbox Code Playgroud)

我知道 Task.WhenAll 是我们可以并行运行的方法。

  • 已投赞成票。我更喜欢 `Task&lt;IList&lt;TResult&gt;&gt;`(或者更好的 `Task&lt;TResult[]&gt;`)的返回类型,而不是 `Task&lt;IEnumerable&lt;TResult&gt;&gt;`。后者传达了[延迟执行](/sf/ask/512682341/)的概念,这在本例中不适用案件。完成“Task”后,生成的“IEnumerable&lt;TResult&gt;”将完全具体化,因为它基于“List&lt;T&gt;”。 (2认同)

nfp*_*lee 6

我和@KTCheek 有同样的问题,我需要它按顺序执行。但是我想我会尝试使用 IAsyncEnumerable(在 .NET Core 3 中引入)并等待 foreach(在 C# 8 中引入)。这是我想出的:

public static class IEnumerableExtensions {
    public static async IAsyncEnumerable<TResult> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector) {
        foreach (var item in source) {
            yield return await selector(item);
        }
    }
}

public static class IAsyncEnumerableExtensions {
    public static async Task<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source) {
        var list = new List<TSource>();

        await foreach (var item in source) {
            list.Add(item);
        }

        return list;
    }
}
Run Code Online (Sandbox Code Playgroud)

这可以通过说:

var inputs = await events.SelectAsync(ev => ProcessEventAsync(ev)).ToListAsync();
Run Code Online (Sandbox Code Playgroud)

更新:或者,您可以添加对“System.Linq.Async”的引用,然后您可以说:

var inputs = await events
    .ToAsyncEnumerable()
    .SelectAwait(async ev => await ProcessEventAsync(ev))
    .ToListAsync();
Run Code Online (Sandbox Code Playgroud)

  • 这两个运算符包含在 [System.Linq.Async](https://www.nuget.org/packages/System.Linq.Async) 包中,名称为“SelectAwait”和“ToListAsync”,以及许多其他 LINQ 风格的运算符。 (2认同)