Juz*_*ott 6 c# multithreading async-await
我只是想知道是否有人可以指出我关于async/await框架和线程池的正确方向?
基本上,我正在尝试做的是在单独的线程/异步中执行x个操作,但是最多y个线程.
例如,假设我有100个数据库操作:
await _repository.WriteData(someData);
我想要做的是有一些方法一次运行10个这样的操作(理想情况是每个单独的线程,所以10个线程),并且每个完成后,下一个在线程上启动变得可用了.然后我们等待所有操作完成并完成所有线程......
这是否可以轻松实现而无需太多努力或增加大量复杂性?
i3a*_*non 14
我认为你通过专注于线程来忽略这一点,特别是对于不需要线程执行的异步操作.
.NET有一个很棒的ThreadPool你可以使用.你不知道它中有多少个线程,你不在乎.它只是工作(直到它没有,你需要自己配置它,但这是非常先进的).
在上面运行任务ThreadPool非常简单.为每个操作创建一个任务,并使用SemaphoreSlim或使用现成的TPL Dataflow块来限制它们.例如:
var block = new ActionBlock<SomeData>(
_ => _repository.WriteDataAsync(_), // What to do on each item
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); // How many items at the same time
foreach (var item in items)
{
block.Post(item); // Post all items to the block
}
block.Complete(); // Signal completion
await block.Completion; // Asynchronously wait for completion.
Run Code Online (Sandbox Code Playgroud)
但是,如果您计划创建"专用"线程,则可以使用创建外部专用线程Task.Factory.StartNew的LongRunning选项ThreadPool.但请记住,异步操作在整个操作过程中不会保持相同的线程,因为异步操作不需要线程.因此,从专用线程开始可能毫无意义(在我的博客上更多内容:LongRunning对于Task.Run使用async-await是无用的)
@ i3arnon的回答是正确的.使用TPL数据流.
本答案的其余部分仅用于教育目的和/或特殊用例.
我最近在一个项目中遇到了类似的问题,我无法引入任何外部依赖项,所以我不得不推出自己的负载平衡实现,结果非常简单(直到你开始接线取消和订购结果 - 但这超出了这个问题的范围).
我无视"10个专用线程"的要求,因为正如其他人已经解释的那样,在处理异步操作时没有意义.相反,我将维护处理工作负载的N并发Task实例.
static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism)
{
Queue<Func<Task>> queue = new Queue<Func<Task>>(taskFactories);
if (queue.Count == 0) {
return;
}
List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);
do
{
while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
{
Func<Task> taskFactory = queue.Dequeue();
tasksInFlight.Add(taskFactory());
}
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
// Propagate exceptions. In-flight tasks will be abandoned if this throws.
await completedTask.ConfigureAwait(false);
tasksInFlight.Remove(completedTask);
}
while (queue.Count != 0 || tasksInFlight.Count != 0);
}
Run Code Online (Sandbox Code Playgroud)
用法:
Func<Task>[] taskFactories = {
() => _repository.WriteData(someData1),
() => _repository.WriteData(someData2),
() => _repository.WriteData(someData3),
() => _repository.WriteData(someData4)
};
await InvokeAsync(taskFactories, maxDegreeOfParallelism: 2);
Run Code Online (Sandbox Code Playgroud)
... 要么
IEnumerable<SomeData> someDataCollection = ... // Get data.
await ParallelTasks.InvokeAsync(
someDataCollection.Select(someData => new Func<Task>(() => _repository.WriteData(someData))),
maxDegreeOfParallelism: 10
);
Run Code Online (Sandbox Code Playgroud)
该解决方案不会遭受差的负载平衡问题,这在其中任务具有变化的持续时间并且输入被预分区(例如这个)的情况下经常在其他简单的实现中看到.
具有性能优化和参数验证的版本:Gist.
| 归档时间: |
|
| 查看次数: |
12390 次 |
| 最近记录: |