如何实现一个高效的 WhenEach 来传输 IAsyncEnumerable 的任务结果?

The*_*ias 6 c# task-parallel-library async-await c#-8.0 iasyncenumerable

我正在尝试使用C# 8提供的新工具更新我的工具集,一种似乎特别有用的方法是Task.WhenAll返回一个IAsyncEnumerable. 此方法应在任务结果可用时立即流式传输任务结果,因此命名它WhenAll没有多大意义。WhenEach听起来更合适。该方法的签名是:

public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);
Run Code Online (Sandbox Code Playgroud)

这种方法可以这样使用:

var tasks = new Task<int>[]
{
    ProcessAsync(1, 300),
    ProcessAsync(2, 500),
    ProcessAsync(3, 400),
    ProcessAsync(4, 200),
    ProcessAsync(5, 100),
};

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

static async Task<int> ProcessAsync(int result, int delay)
{
    await Task.Delay(delay);
    return result;
}
Run Code Online (Sandbox Code Playgroud)

预期输出:

已处理:5 已
处理:4 已
处理:1 已
处理:3 已
处理:2

我设法Task.WhenAny在循环中使用该方法编写了一个基本实现,但是这种方法存在一个问题:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
{
    var hashSet = new HashSet<Task<TResult>>(tasks);
    while (hashSet.Count > 0)
    {
        var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
        yield return await task.ConfigureAwait(false);
        hashSet.Remove(task);
    }
}
Run Code Online (Sandbox Code Playgroud)

问题是性能。该实施Task.WhenAny创建任务提供的一系列的防守副本,如此反复称这是在一个循环的结果O(N²)计算复杂度。我的天真实现很难处理 10,000 个任务。我的机器上的开销接近 10 秒。我希望该方法的性能几乎与内置的一样Task.WhenAll,可以轻松处理数十万个任务。我怎样才能改进WhenEach方法以使其表现得体?

Joh*_*anP 5

通过使用代码从这个文章,你可以实现:

public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
   var inputTasks = tasks.ToList();

   var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
   var results = new Task<Task<T>>[buckets.Length];
   for (int i = 0; i < buckets.Length; i++)
   {
       buckets[i] = new TaskCompletionSource<Task<T>>();
       results[i] = buckets[i].Task;
   }

   int nextTaskIndex = -1;
   Action<Task<T>> continuation = completed =>
   {
       var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
       bucket.TrySetResult(completed);
   };

   foreach (var inputTask in inputTasks)
       inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

   return results;
}
Run Code Online (Sandbox Code Playgroud)

然后更改您WhenEach的调用Interleaved代码

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)
{
    foreach (var bucket in Interleaved(tasks))
    {
        var t = await bucket;
        yield return await t;
    }
}
Run Code Online (Sandbox Code Playgroud)

然后你可以WhenEach像往常一样打电话给你

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}
Run Code Online (Sandbox Code Playgroud)

我对 10k 任务进行了一些基本的基准测试,在速度方面的表现提高了 5 倍。


Pan*_*vos 5

您可以将 Channel 用作异步队列。每个任务在完成时都可以写入通道。通道中的项目将通过ChannelReader.ReadAllAsync作为 IAsyncEnumerable 返回。

IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    var continuations=inputTasks.Select(t=>t.ContinueWith(x=>
                                           writer.TryWrite(x.Result)));
    _ = Task.WhenAll(continuations)
            .ContinueWith(t=>writer.Complete(t.Exception));

    return channel.Reader.ReadAllAsync();
}
Run Code Online (Sandbox Code Playgroud)

当所有任务完成时writer.Complete()调用关闭通道。

为了测试这一点,此代码生成具有减少延迟的任务。这应该以相反的顺序返回索引:

var tasks=Enumerable.Range(1,4)
                    .Select(async i=>
                    { 
                      await Task.Delay(300*(5-i));
                      return i;
                    });

await foreach(var i in Interleave(tasks))
{
     Console.WriteLine(i);

}
Run Code Online (Sandbox Code Playgroud)

产生:

4
3
2
1
Run Code Online (Sandbox Code Playgroud)