如何使用 C#8 IAsyncEnumerable<T> 异步枚举并行运行的任务

i c*_*dez 8 c# asynchronous task-parallel-library async-await iasyncenumerable

如果可能,我想为并行启动的任务创建一个异步枚举器。所以第一个完成的是枚举的第一个元素,第二个完成的是枚举的第二个元素,依此类推。

public static async IAsyncEnumerable<T> ParallelEnumerateAsync(this IEnumerable<Task<T>> coldAsyncTasks)
{
    // ... 
}
Run Code Online (Sandbox Code Playgroud)

我打赌有一种使用ContinueWith和 a 的方法Queue<T>,但我并不完全相信自己会实现它。

Pau*_*ado 13

这是你要找的吗?

public static async IAsyncEnumerable<T> ParallelEnumerateAsync<T>(
    this IEnumerable<Task<T>> tasks)
{
    var remaining = new List<Task<T>>(tasks);

    while (remaining.Count != 0)
    {
        var task = await Task.WhenAny(remaining);
        remaining.Remove(task);
        yield return (await task);
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 如果任务数量较多(超过 1,000 个),此解决方案将不再有效。在这种情况下,按完成情况对任务进行排序的另一种更有效的方法是使用 [`OrderByCompletion`](http://dotnetapis.com/pkg/Nito.AsyncEx.Tasks/5.0.0/netstandard2.0/doc /Nito.AsyncEx.TaskExtensions/OrderByCompletion''1(System.Collections.Generic.IEnumerable(System.Threading.Tasks.Task(''0)))) 来自 Stephen Cleary 的 [Nito.AsyncEx](https:// www.nuget.org/packages/Nito.AsyncEx)库。 (2认同)

DK.*_*DK. 6

如果我理解你的问题,你的重点是启动所有任务,让它们并行运行,但确保返回值的处理顺序与任务启动的顺序相同。

查看规范,使用C# 8.0 异步流任务排队以进行并行执行,但顺序返回可能如下所示。

/// Demonstrates Parallel Execution - Sequential Results with test tasks
async Task RunAsyncStreams()
{
    await foreach (var n in RunAndPreserveOrderAsync(GenerateTasks(6)))
    {
        Console.WriteLine($"#{n} is returned");
    }
}

/// Returns an enumerator that will produce a number of test tasks running
/// for a random time.
IEnumerable<Task<int>> GenerateTasks(int count)
{
    return Enumerable.Range(1, count).Select(async n =>
    {
        await Task.Delay(new Random().Next(100, 1000));
        Console.WriteLine($"#{n} is complete");
        return n;
    });
}

/// Launches all tasks in order of enumeration, then waits for the results
/// in the same order: Parallel Execution - Sequential Results.
async IAsyncEnumerable<T> RunAndPreserveOrderAsync<T>(IEnumerable<Task<T>> tasks)
{
    var queue = new Queue<Task<T>>(tasks);
    while (queue.Count > 0) yield return await queue.Dequeue();
}
Run Code Online (Sandbox Code Playgroud)

可能的输出:

#5 is complete
#1 is complete
#1 is returned
#3 is complete
#6 is complete
#2 is complete
#2 is returned
#3 is returned
#4 is complete
#4 is returned
#5 is returned
#6 is returned
Run Code Online (Sandbox Code Playgroud)

实际上,这种模式似乎没有任何新的语言级支持,此外,由于异步流处理IAsyncEnumerable<T>,这意味着基类Task在这里不起作用,所有工作async方法都应该具有相同的Task<T>返回类型,这在某种程度上限制了基于异步流的设计。

因此,根据您的情况(您是否希望能够取消长时间运行的任务?是否需要对每个任务进行异常处理?是否应该限制并发任务的数量?)检查一下可能是有意义的@TheGeneral 的建议。

更新:

请注意,RunAndPreserveOrderAsync<T>不一定必须使用 a Queueof tasks - 选择这只是为了更好地显示编码意图。

var queue = new Queue<Task<T>>(tasks);
while (queue.Count > 0) yield return await queue.Dequeue();
Run Code Online (Sandbox Code Playgroud)

将枚举数转换为List将产生相同的结果;的正文RunAndPreserveOrderAsync<T>可以在这里用一行替换

foreach(var task in tasks.ToList()) yield return await task;
Run Code Online (Sandbox Code Playgroud)

在此实现中,重要的是首先生成并启动所有任务,这与Queue初始化或tasks可枚举到List. 但是,可能很难拒绝foreach像这样简化上面的行

foreach(var task in tasks) yield return await task;
Run Code Online (Sandbox Code Playgroud)

这将导致任务按顺序执行而不是并行运行。