i c*_*dez 8 c# asynchronous task-parallel-library async-await iasyncenumerable
如果可能,我想为并行启动的任务创建一个异步枚举器。所以第一个完成的是枚举的第一个元素,第二个完成的是枚举的第二个元素,依此类推。
public static async IAsyncEnumerable<T> ParallelEnumerateAsync(this IEnumerable<Task<T>> coldAsyncTasks)
{
// ...
}
Run Code Online (Sandbox Code Playgroud)
我打赌有一种使用ContinueWith和 a 的方法Queue<T>,但我并不完全相信自己会实现它。
Pau*_*ado 13
这是你要找的吗?
public static async IAsyncEnumerable<T> ParallelEnumerateAsync<T>(
this IEnumerable<Task<T>> tasks)
{
var remaining = new List<Task<T>>(tasks);
while (remaining.Count != 0)
{
var task = await Task.WhenAny(remaining);
remaining.Remove(task);
yield return (await task);
}
}
Run Code Online (Sandbox Code Playgroud)
如果我理解你的问题,你的重点是启动所有任务,让它们并行运行,但确保返回值的处理顺序与任务启动的顺序相同。
查看规范,使用C# 8.0 异步流任务排队以进行并行执行,但顺序返回可能如下所示。
/// Demonstrates Parallel Execution - Sequential Results with test tasks
async Task RunAsyncStreams()
{
await foreach (var n in RunAndPreserveOrderAsync(GenerateTasks(6)))
{
Console.WriteLine($"#{n} is returned");
}
}
/// Returns an enumerator that will produce a number of test tasks running
/// for a random time.
IEnumerable<Task<int>> GenerateTasks(int count)
{
return Enumerable.Range(1, count).Select(async n =>
{
await Task.Delay(new Random().Next(100, 1000));
Console.WriteLine($"#{n} is complete");
return n;
});
}
/// Launches all tasks in order of enumeration, then waits for the results
/// in the same order: Parallel Execution - Sequential Results.
async IAsyncEnumerable<T> RunAndPreserveOrderAsync<T>(IEnumerable<Task<T>> tasks)
{
var queue = new Queue<Task<T>>(tasks);
while (queue.Count > 0) yield return await queue.Dequeue();
}
Run Code Online (Sandbox Code Playgroud)
可能的输出:
#5 is complete
#1 is complete
#1 is returned
#3 is complete
#6 is complete
#2 is complete
#2 is returned
#3 is returned
#4 is complete
#4 is returned
#5 is returned
#6 is returned
Run Code Online (Sandbox Code Playgroud)
实际上,这种模式似乎没有任何新的语言级支持,此外,由于异步流处理IAsyncEnumerable<T>,这意味着基类Task在这里不起作用,所有工作async方法都应该具有相同的Task<T>返回类型,这在某种程度上限制了基于异步流的设计。
因此,根据您的情况(您是否希望能够取消长时间运行的任务?是否需要对每个任务进行异常处理?是否应该限制并发任务的数量?)检查一下可能是有意义的@TheGeneral 的建议。
更新:
请注意,RunAndPreserveOrderAsync<T>不一定必须使用 a Queueof tasks - 选择这只是为了更好地显示编码意图。
var queue = new Queue<Task<T>>(tasks);
while (queue.Count > 0) yield return await queue.Dequeue();
Run Code Online (Sandbox Code Playgroud)
将枚举数转换为List将产生相同的结果;的正文RunAndPreserveOrderAsync<T>可以在这里用一行替换
foreach(var task in tasks.ToList()) yield return await task;
Run Code Online (Sandbox Code Playgroud)
在此实现中,重要的是首先生成并启动所有任务,这与Queue初始化或tasks可枚举到List. 但是,可能很难拒绝foreach像这样简化上面的行
foreach(var task in tasks) yield return await task;
Run Code Online (Sandbox Code Playgroud)
这将导致任务按顺序执行而不是并行运行。
| 归档时间: |
|
| 查看次数: |
5847 次 |
| 最近记录: |