The*_*ias 6 c# task-parallel-library async-await c#-8.0 iasyncenumerable
我正在尝试使用C# 8提供的新工具更新我的工具集,一种似乎特别有用的方法是Task.WhenAll
返回一个IAsyncEnumerable
. 此方法应在任务结果可用时立即流式传输任务结果,因此命名它WhenAll
没有多大意义。WhenEach
听起来更合适。该方法的签名是:
public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);
Run Code Online (Sandbox Code Playgroud)
这种方法可以这样使用:
var tasks = new Task<int>[]
{
ProcessAsync(1, 300),
ProcessAsync(2, 500),
ProcessAsync(3, 400),
ProcessAsync(4, 200),
ProcessAsync(5, 100),
};
await foreach (int result in WhenEach(tasks))
{
Console.WriteLine($"Processed: {result}");
}
static async Task<int> ProcessAsync(int result, int delay)
{
await Task.Delay(delay);
return result;
}
Run Code Online (Sandbox Code Playgroud)
预期输出:
已处理:5 已
处理:4 已
处理:1 已
处理:3 已
处理:2
我设法Task.WhenAny
在循环中使用该方法编写了一个基本实现,但是这种方法存在一个问题:
public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
Task<TResult>[] tasks)
{
var hashSet = new HashSet<Task<TResult>>(tasks);
while (hashSet.Count > 0)
{
var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
yield return await task.ConfigureAwait(false);
hashSet.Remove(task);
}
}
Run Code Online (Sandbox Code Playgroud)
问题是性能。该实施的Task.WhenAny
创建任务提供的一系列的防守副本,如此反复称这是在一个循环的结果O(N²)计算复杂度。我的天真实现很难处理 10,000 个任务。我的机器上的开销接近 10 秒。我希望该方法的性能几乎与内置的一样Task.WhenAll
,可以轻松处理数十万个任务。我怎样才能改进WhenEach
方法以使其表现得体?
通过使用代码从这个文章,你可以实现:
public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
var inputTasks = tasks.ToList();
var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
var results = new Task<Task<T>>[buckets.Length];
for (int i = 0; i < buckets.Length; i++)
{
buckets[i] = new TaskCompletionSource<Task<T>>();
results[i] = buckets[i].Task;
}
int nextTaskIndex = -1;
Action<Task<T>> continuation = completed =>
{
var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
bucket.TrySetResult(completed);
};
foreach (var inputTask in inputTasks)
inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
return results;
}
Run Code Online (Sandbox Code Playgroud)
然后更改您WhenEach
的调用Interleaved
代码
public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)
{
foreach (var bucket in Interleaved(tasks))
{
var t = await bucket;
yield return await t;
}
}
Run Code Online (Sandbox Code Playgroud)
然后你可以WhenEach
像往常一样打电话给你
await foreach (int result in WhenEach(tasks))
{
Console.WriteLine($"Processed: {result}");
}
Run Code Online (Sandbox Code Playgroud)
我对 10k 任务进行了一些基本的基准测试,在速度方面的表现提高了 5 倍。
您可以将 Channel 用作异步队列。每个任务在完成时都可以写入通道。通道中的项目将通过ChannelReader.ReadAllAsync作为 IAsyncEnumerable 返回。
IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
var continuations=inputTasks.Select(t=>t.ContinueWith(x=>
writer.TryWrite(x.Result)));
_ = Task.WhenAll(continuations)
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader.ReadAllAsync();
}
Run Code Online (Sandbox Code Playgroud)
当所有任务完成时writer.Complete()
调用关闭通道。
为了测试这一点,此代码生成具有减少延迟的任务。这应该以相反的顺序返回索引:
var tasks=Enumerable.Range(1,4)
.Select(async i=>
{
await Task.Delay(300*(5-i));
return i;
});
await foreach(var i in Interleave(tasks))
{
Console.WriteLine(i);
}
Run Code Online (Sandbox Code Playgroud)
产生:
4
3
2
1
Run Code Online (Sandbox Code Playgroud)