C#任务线程池 - 仅跨10个线程运行100个任务

Juz*_*ott 6 c# multithreading async-await

我只是想知道是否有人可以指出我关于async/await框架和线程池的正确方向?

基本上,我正在尝试做的是在单独的线程/异步中执行x个操作,但是最多y个线程.

例如,假设我有100个数据库操作: await _repository.WriteData(someData);

我想要做的是有一些方法一次运行10个这样的操作(理想情况是每个单独的线程,所以10个线程),并且每个完成后,下一个在线程上启动变得可用了.然后我们等待所有操作完成并完成所有线程......

这是否可以轻松实现而无需太多努力或增加大量复杂性?

i3a*_*non 14

我认为你通过专注于线程来忽略这一点,特别是对于不需要线程执行的异步操作.

.NET有一个很棒的ThreadPool你可以使用.你不知道它中有多少个线程,你不在乎.它只是工作(直到它没有,你需要自己配置它,但这是非常先进的).

在上面运行任务ThreadPool非常简单.为每个操作创建一个任务,并使用SemaphoreSlim或使用现成的TPL Dataflow块来限制它们.例如:

var block = new ActionBlock<SomeData>(
    _ => _repository.WriteDataAsync(_), // What to do on each item
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); // How many items at the same time

foreach (var item in items)
{
    block.Post(item); // Post all items to the block
}

block.Complete(); // Signal completion
await block.Completion; // Asynchronously wait for completion.
Run Code Online (Sandbox Code Playgroud)

但是,如果您计划创建"专用"线程,则可以使用创建外部专用线程Task.Factory.StartNewLongRunning选项ThreadPool.但请记住,异步操作在整个操作过程中不会保持相同的线程,因为异步操作不需要线程.因此,从专用线程开始可能毫无意义(在我的博客上更多内容:LongRunning对于Task.Run使用async-await是无用的)

  • TPL 数据流不是“内置的”。我是从你的博客里了解到的,哈哈 (2认同)

Kir*_*kiy 7

@ i3arnon的回答是正确的.使用TPL数据流.

本答案的其余部分仅用于教育目的和/或特殊用例.

我最近在一个项目中遇到了类似的问题,我无法引入任何外部依赖项,所以我不得不推出自己的负载平衡实现,结果非常简单(直到你开始接线取消和订购结果 - 但这超出了这个问题的范围).

我无视"10个专用线程"的要求,因为正如其他人已经解释的那样,在处理异步操作时没有意义.相反,我将维护处理工作负载的N并发Task实例.

static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism)
{
    Queue<Func<Task>> queue = new Queue<Func<Task>>(taskFactories);

    if (queue.Count == 0) {
        return;
    }

    List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);

    do
    {
        while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
        {
            Func<Task> taskFactory = queue.Dequeue();

            tasksInFlight.Add(taskFactory());
        }

        Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);

        // Propagate exceptions. In-flight tasks will be abandoned if this throws.
        await completedTask.ConfigureAwait(false);

        tasksInFlight.Remove(completedTask);
    }
    while (queue.Count != 0 || tasksInFlight.Count != 0);
}
Run Code Online (Sandbox Code Playgroud)

用法:

Func<Task>[] taskFactories = {
    () => _repository.WriteData(someData1),
    () => _repository.WriteData(someData2),
    () => _repository.WriteData(someData3),
    () => _repository.WriteData(someData4)
};

await InvokeAsync(taskFactories, maxDegreeOfParallelism: 2);
Run Code Online (Sandbox Code Playgroud)

... 要么

IEnumerable<SomeData> someDataCollection = ... // Get data.

await ParallelTasks.InvokeAsync(
    someDataCollection.Select(someData => new Func<Task>(() => _repository.WriteData(someData))),
    maxDegreeOfParallelism: 10
);
Run Code Online (Sandbox Code Playgroud)

该解决方案不会遭受差的负载平衡问题,这在其中任务具有变化的持续时间并且输入被预分区(例如这个)的情况下经常在其他简单的实现中看到.

具有性能优化和参数验证的版本:Gist.