在 Task.WhenAll 中执行许多任务时的 C# 线程

Sti*_*son 5 c# async-await

如果在单个线程上执行以下操作会发生什么:

await Task.WhenAll(items.select(x => SomeAsyncMethod(x)))

// Where SomeAsyncMethod is defined like this (writeAsync is pure async io)
async Task SomeAsyncMethod(Item item){
  await myDevice.writeAsync(...).ConfigureAwait(false);
  //do some cpu intensive stuff...
}
Run Code Online (Sandbox Code Playgroud)

并说有 10.000 个项目items。当每个SomeAsyncMethod在 await 之后 continue 时,它​​都会在线程池中的一个线程上执行此操作。因此,当许多SomeAsyncsMethods 返回时,线程池中的多个线程将被同时获取,还是SomeAsyncMethod在这种情况下,在任何给定时刻只有一个线程执行“执行一些 CPU 密集型操作” ?

更新:好的,这是一个示例程序。当我在具有 8 个逻辑核心的 PC 上进行测试时,minthreads 为 12 或 13,maxthreads 在 35-40 范围内结束。所以看起来最多 4 个线程将被创建为 pr 逻辑核心。创建 10.000 或 100.000 个文件无关紧要 - 使用相同的最大线程数 - 也许这是因为所有任务都在排队等待访问文件系统?请注意,该程序将在 c:\tmp\asynctest 中创建大量小文件:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication4 {
    internal class Program {
        private static void Main(string[] args) {
            var myDevice = new MyDevice();
            var ints = new List<int>();
            for (var i = 0; i < 10000; i++) {
                ints.Add(i);
            }
            var task = Task.WhenAll(ints.Select(i => myDevice.WriteTextAsync(i.ToString())));
            task.Wait();
            Console.WriteLine("Max thread count = " + myDevice.MaxThreadCount);
            Console.WriteLine("Min thread count = " + myDevice.MinThreadCount);
            Console.ReadLine();
        }
    }

    public class MyDevice {
        public ConcurrentDictionary<string, string> ThreadIds;
        public int MaxThreadCount;
        public int MinThreadCount = Process.GetCurrentProcess().Threads.Count;
        public async Task WriteTextAsync(string text) {
            var filePath = @"c:\tmp\asynctest\" + text + ".txt";
            var encodedText = Encoding.Unicode.GetBytes(text);
            using (var sourceStream = new FileStream(filePath,
                FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) {
                await sourceStream.WriteAsync(encodedText, 0, encodedText.Length).ConfigureAwait(false);
                MaxThreadCount = Math.Max(MaxThreadCount, Process.GetCurrentProcess().Threads.Count);
                MinThreadCount = Math.Min(MinThreadCount, Process.GetCurrentProcess().Threads.Count);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

更新 2。现在,如果我启动多个线程,每个线程同时执行大量 aysnc io 任务,那么与更新 1 中的单线程示例相比,似乎总共使用了更多线程。在我刚刚运行的测试中,其中10.000 个文件由 4 个线程创建,然后最大线程数为 41,最小线程数为 12 - 因此似乎有一些中央控制用于异步任务继续的线程数。这是一个示例,其中 4 个线程每个启动 10.000 个异步操作:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication4 {
    internal class Program {
        private static void Main(string[] args) {
            var myDevice = new MyDevice();
            var ints = new List<int>();
            const int limit = 10000;
            for (var i = 0; i < limit; i++) {
                ints.Add(i);
            }

            List<Task> jobs = new List<Task>();
            for (var j = 0; j < 4*limit; j+=limit) {
                var jobid = j;
                jobs.Add(Task.Run(() => Runjob(ints, myDevice, jobid)));
            }
            Task.WaitAll(jobs.ToArray());

            Console.WriteLine("Max thread count = " + myDevice.MaxThreadCount);
            Console.WriteLine("Min thread count = " + myDevice.MinThreadCount);
            Console.ReadLine();
        }

        private static void Runjob(List<int> ints, MyDevice myDevice, int jobid) {
            Console.WriteLine("Starting job " + jobid);
            var task = Task.WhenAll(ints.Select(i => myDevice.WriteTextAsync((jobid+i).ToString())));
            task.Wait();
            Console.WriteLine("Finished job " + jobid);
        }
    }

    public class MyDevice {
        public int MaxThreadCount;
        public int MinThreadCount = Process.GetCurrentProcess().Threads.Count;
        public async Task WriteTextAsync(string text) {
            var filePath = @"c:\tmp\asynctest\" + text + ".txt";
            var encodedText = Encoding.Unicode.GetBytes(text);
            using (var sourceStream = new FileStream(filePath,
                FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) {
                await sourceStream.WriteAsync(encodedText, 0, encodedText.Length).ConfigureAwait(false);
                MaxThreadCount = Math.Max(MaxThreadCount, Process.GetCurrentProcess().Threads.Count);
                MinThreadCount = Math.Min(MinThreadCount, Process.GetCurrentProcess().Threads.Count);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Lua*_*aan 4

最有可能的情况是,“CPU 密集型工作”将分别发生在随机线程池线程上 - 如果它确实受 CPU 限制,则每个逻辑核心将获得大约 1-2 个线程来完成工作。

关键点是,虽然原始任务 ( Task.WhenAll) 的延续将在 UI 线程上运行(当然,如果有同步上下文),但各个 I/O 操作的延续将发布到线程池上,因为您明确请求忽略同步上下文 ( ConfigureAwait(false))。

但是,如果 I/O 请求同步完成,则所有内容也有可能在原始线程上运行。在这种情况下,不会进行异步调度,任务也没有机会切换线程。如果需要确保并行化,则必须Task.Run显式使用。

还应该注意的是,这主要取决于实现,而不是您可以依赖的东西。对于严重异步 I/O 应用程序来说,这也可能是一个糟糕的方法,因为您可能在 I/O 线程池中的线程上运行 CPU 密集型内容 - 扰乱线程池中线程的框架平衡,并防止新的异步响应出现,直到您完成工作。如果您正在做的工作不是纯粹的 CPU 工作,则尤其如此 - 例如,在 Web 服务器等设备上阻塞线程池线程可能会非常痛苦。