等待和任务并发

Nic*_*ros 2 c# async-await

我正在尝试创建一个TaskScheduler,它可以限制可以同时运行的线程数.我正在使用这个例子.问题是我看到了一些我不理解的行为.

如果我像示例一样创建我的类:

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5);
TaskFactory factory = new TaskFactory(lcts);
Run Code Online (Sandbox Code Playgroud)

然后像这样运行:

foreach (int i = 0; i < 10; ++i) 
{
    factory.StartNew(() => DoWork());
}
Run Code Online (Sandbox Code Playgroud)

工作看起来像这样:

private async Task DoWork()
{
    // Do some stuff here like
    StaticIntValueWorkOne++;

    // And then more stuff that is async here
    int someValue = await DoAdditionalWorkAsync();
    Thread.Sleep(10000);
    StaticIntValueWorkTwo++;
}
Run Code Online (Sandbox Code Playgroud)

我看到的是StaticIntValueWorkOne立即增加10次,而StaticIntValueWorkTwo只增加一次.然后在10秒后我看到StaticIntValueWorkTwo增量,然后每10秒后增加.我没有得到的是等待DoAdditionalWorkAsync()和并发做的事情.我以为我会看到StaticIntValueWorkOne增加一次,然后StaticIntValueWorkTwo会增加一次.我错过了什么?我只需要一个awaitfactor.StartNew()

Ste*_*ary 5

我正在尝试创建一个TaskScheduler,它可以限制可以同时运行的线程数.

您可能想直接跳到答案.;)

var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 5)
    .ConcurrentScheduler;
Run Code Online (Sandbox Code Playgroud)

我没有得到的是DoAdditionalWorkAsync()上的await正在使用并发.

任务调度程序仅适用于执行代码.当一个async方法在任务调度程序上执行时,您可以将其视为分解为多个任务,每个await点都有一个中断.默认情况下,awaitasync方法将重新进入其任务调度程序.该async方法在进行时不在任务调度程序中await.

因此,当方法正在进行时,调度限制(一次5个)根本不适用await.因此,在您DoWork的方法中,该方法将首先递增变量,然后屈服于任务调度程序.虽然屈服了,但它并没有"计入"你的并发限制.稍后,当该方法恢复时,它将阻塞线程(它会"计数")并增加第二个变量.

使用此代码:

private static void Main(string[] args)
{
    var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 5)
        .ConcurrentScheduler;
    TaskFactory factory = new TaskFactory(scheduler);

    for (int i = 0; i < 10; ++i)
    {
        factory.StartNew(() => DoWork());
    }

    Console.ReadKey();
}

private static int StaticIntValueWorkOne, StaticIntValueWorkTwo;

private static async Task DoWork()
{
    // Do some stuff here like
    Console.WriteLine(DateTime.UtcNow + " StaticIntValueWorkOne" + Interlocked.Increment(ref StaticIntValueWorkOne));

    // And then more stuff that is async here
    await Task.Yield();
    Thread.Sleep(10000);
    Console.WriteLine(DateTime.UtcNow + " StaticIntValueWorkTwo" + Interlocked.Increment(ref StaticIntValueWorkTwo));
}
Run Code Online (Sandbox Code Playgroud)

我得到这个(预期)输出:

3/20/2015 11:01:53 AM StaticIntValueWorkOne1
3/20/2015 11:01:53 AM StaticIntValueWorkOne5
3/20/2015 11:01:53 AM StaticIntValueWorkOne4
3/20/2015 11:01:53 AM StaticIntValueWorkOne2
3/20/2015 11:01:53 AM StaticIntValueWorkOne3
3/20/2015 11:01:53 AM StaticIntValueWorkOne6
3/20/2015 11:01:53 AM StaticIntValueWorkOne9
3/20/2015 11:01:53 AM StaticIntValueWorkOne10
3/20/2015 11:01:53 AM StaticIntValueWorkOne7
3/20/2015 11:01:53 AM StaticIntValueWorkOne8
3/20/2015 11:02:03 AM StaticIntValueWorkTwo1
3/20/2015 11:02:03 AM StaticIntValueWorkTwo3
3/20/2015 11:02:03 AM StaticIntValueWorkTwo2
3/20/2015 11:02:03 AM StaticIntValueWorkTwo4
3/20/2015 11:02:03 AM StaticIntValueWorkTwo5
3/20/2015 11:02:13 AM StaticIntValueWorkTwo6
3/20/2015 11:02:13 AM StaticIntValueWorkTwo7
3/20/2015 11:02:13 AM StaticIntValueWorkTwo8
3/20/2015 11:02:13 AM StaticIntValueWorkTwo9
3/20/2015 11:02:13 AM StaticIntValueWorkTwo10
Run Code Online (Sandbox Code Playgroud)

如果要限制异步代码的并发性,请查看SemaphoreSlim或TPL数据流.