我正在尝试创建一个TaskScheduler,它可以限制可以同时运行的线程数.我正在使用这个例子.问题是我看到了一些我不理解的行为.
如果我像示例一样创建我的类:
LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5);
TaskFactory factory = new TaskFactory(lcts);
然后像这样运行:
foreach (int i = 0; i < 10; ++i) 
{
    factory.StartNew(() => DoWork());
}
工作看起来像这样:
private async Task DoWork()
{
    // Do some stuff here like
    StaticIntValueWorkOne++;
    // And then more stuff that is async here
    int someValue = await DoAdditionalWorkAsync();
    Thread.Sleep(10000);
    StaticIntValueWorkTwo++;
}
我看到的是StaticIntValueWorkOne立即增加10次,而StaticIntValueWorkTwo只增加一次.然后在10秒后我看到StaticIntValueWorkTwo增量,然后每10秒后增加.我没有得到的是等待DoAdditionalWorkAsync()和并发做的事情.我以为我会看到StaticIntValueWorkOne增加一次,然后StaticIntValueWorkTwo会增加一次.我错过了什么?我只需要一个await对factor.StartNew()?
我正在尝试创建一个TaskScheduler,它可以限制可以同时运行的线程数.
您可能想直接跳到答案.;)
var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 5)
    .ConcurrentScheduler;
我没有得到的是DoAdditionalWorkAsync()上的await正在使用并发.
任务调度程序仅适用于执行代码.当一个async方法在任务调度程序上执行时,您可以将其视为分解为多个任务,每个await点都有一个中断.默认情况下,await该async方法将重新进入其任务调度程序.该async方法在进行时不在任务调度程序中await.
因此,当方法正在进行时,调度限制(一次5个)根本不适用await.因此,在您DoWork的方法中,该方法将首先递增变量,然后屈服于任务调度程序.虽然屈服了,但它并没有"计入"你的并发限制.稍后,当该方法恢复时,它将阻塞线程(它会"计数")并增加第二个变量.
使用此代码:
private static void Main(string[] args)
{
    var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 5)
        .ConcurrentScheduler;
    TaskFactory factory = new TaskFactory(scheduler);
    for (int i = 0; i < 10; ++i)
    {
        factory.StartNew(() => DoWork());
    }
    Console.ReadKey();
}
private static int StaticIntValueWorkOne, StaticIntValueWorkTwo;
private static async Task DoWork()
{
    // Do some stuff here like
    Console.WriteLine(DateTime.UtcNow + " StaticIntValueWorkOne" + Interlocked.Increment(ref StaticIntValueWorkOne));
    // And then more stuff that is async here
    await Task.Yield();
    Thread.Sleep(10000);
    Console.WriteLine(DateTime.UtcNow + " StaticIntValueWorkTwo" + Interlocked.Increment(ref StaticIntValueWorkTwo));
}
我得到这个(预期)输出:
3/20/2015 11:01:53 AM StaticIntValueWorkOne1
3/20/2015 11:01:53 AM StaticIntValueWorkOne5
3/20/2015 11:01:53 AM StaticIntValueWorkOne4
3/20/2015 11:01:53 AM StaticIntValueWorkOne2
3/20/2015 11:01:53 AM StaticIntValueWorkOne3
3/20/2015 11:01:53 AM StaticIntValueWorkOne6
3/20/2015 11:01:53 AM StaticIntValueWorkOne9
3/20/2015 11:01:53 AM StaticIntValueWorkOne10
3/20/2015 11:01:53 AM StaticIntValueWorkOne7
3/20/2015 11:01:53 AM StaticIntValueWorkOne8
3/20/2015 11:02:03 AM StaticIntValueWorkTwo1
3/20/2015 11:02:03 AM StaticIntValueWorkTwo3
3/20/2015 11:02:03 AM StaticIntValueWorkTwo2
3/20/2015 11:02:03 AM StaticIntValueWorkTwo4
3/20/2015 11:02:03 AM StaticIntValueWorkTwo5
3/20/2015 11:02:13 AM StaticIntValueWorkTwo6
3/20/2015 11:02:13 AM StaticIntValueWorkTwo7
3/20/2015 11:02:13 AM StaticIntValueWorkTwo8
3/20/2015 11:02:13 AM StaticIntValueWorkTwo9
3/20/2015 11:02:13 AM StaticIntValueWorkTwo10
如果要限制异步代码的并发性,请查看SemaphoreSlim或TPL数据流.