为什么Monitor.PulseAll会在信号线程中产生"踩台阶"延迟模式?

Chu*_*huu 52 .net c# multithreading latency

在使用Monitor.PulseAll()进行线程同步的库中,我注意到从PulseAll(...)被调用到线程被唤醒的时间的延迟似乎遵循"踩台阶"分布 - 极其大步.被唤醒的线程几乎没有工作; 并几乎立即回到监视器上等待.例如,在具有12个核心的盒子上,24个线程在监视器上等待(2x Xeon5680/Gulftown;每个处理器6个物理核心; HT禁用),Pulse和线程唤醒之间的延迟是这样的:

使用Monitor.PulseAll()的延迟;  第三方图书馆

前12个线程(注意我们有12个内核)需要30到60微秒才能响应.然后我们开始获得非常大的跳跃; 高原在700,1300,1900和2600微秒左右.

我能够使用下面的代码成功地独立于第三方库重新创建此行为.这段代码的作用是启动大量线程(更改numThreads参数),只需在监视器上等待,读取时间戳,将其记录到ConcurrentSet,然后立即返回Waiting.一旦第二个PulseAll()唤醒所有线程.它执行此操作20次,并将第10次迭代的延迟报告给控制台.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace PulseAllTest
{
    class Program
    {
        static long LastTimestamp;
        static long Iteration;
        static object SyncObj = new object();
        static Stopwatch s = new Stopwatch();
        static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>();

        static void Main(string[] args)
        {
            long numThreads = 32;

            for (int i = 0; i < numThreads; ++i)
            {
                Task.Factory.StartNew(ReadLastTimestampAndPublish, TaskCreationOptions.LongRunning);
            }

            s.Start();
            for (int i = 0; i < 20; ++i)
            {
                lock (SyncObj)
                {
                    ++Iteration;
                    LastTimestamp = s.Elapsed.Ticks;
                    Monitor.PulseAll(SyncObj);
                }
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }

            Console.WriteLine(String.Join("\n",
                from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 
                    select ((decimal)n.Item2)/TimeSpan.TicksPerMillisecond));
            Console.Read();
        }

        static void ReadLastTimestampAndPublish()
        {
            while(true)
            {
                lock(SyncObj)
                {
                    Monitor.Wait(SyncObj);
                }
                IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

使用上面的代码,这里是启用8核/ w超线程(即任务管理器中的16个核心)和32个线程(*2x Xeon5550/Gainestown;每个处理器4个物理核心; HT启用)的盒子上的延迟示例:

延迟使用Monitor.PulseAll(),示例代码

编辑:为了尝试将NUMA排除在等式之外,下面是运行示例程序的图表,在Core i7-3770(Ivy Bridge)上有16个线程; 4个物理核心; HT启用:

延迟使用Monitor.PulseAll(),示例代码,没有NUMA

任何人都可以解释为什么Monitor.PulseAll()以这种方式行事?

EDIT2:

为了尝试表明这种行为不是同时唤醒一堆线程所固有的,我已经使用Events复制了测试程序的行为; 而不是测量PulseAll()的延迟,我正在测量ManualResetEvent.Set()的延迟.代码创建了许多工作线程,然后在同一个ManualResetEvent对象上等待ManualResetEvent.Set()事件.当事件被触发时,他们会进行延迟测量,然后立即等待他们自己的每个线程AutoResetEvent.在下一次迭代(之前500ms)之前,ManualResetEvent是Reset(),然后每个AutoResetEvent都是Set(),因此线程可以返回等待共享的ManualResetEvent.

我犹豫是否发布这个因为它可能是一个巨大的红色听证会(我没有声称事件和监视器表现相似)加上它使用一些绝对可怕的做法让事件表现得像一个监视器(我喜欢/讨厌看到我的如果我将其提交给代码审查,那么同事就会这样做; 但我认为结果很有启发性.

该测试在与原始测试相同的机器上进行; 2xXeon5680/Gulftown; 每个处理器6个内核(总共12个内核); 超线程禁用.

ManualResetEventLatency

如果不明显,这与Monitor.PulseAll有多么不同; 这是覆盖在最后一个图上的第一个图:

ManualResetEventLatency与Monitor Latency

用于生成这些测量的代码如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace MRETest
{
    class Program
    {
        static long LastTimestamp;
        static long Iteration;
        static ManualResetEventSlim MRES = new ManualResetEventSlim(false);
        static List<ReadLastTimestampAndPublish> Publishers = 
            new List<ReadLastTimestampAndPublish>();
        static Stopwatch s = new Stopwatch();
        static ConcurrentBag<Tuple<long, long>> IterationToTicks = 
            new ConcurrentBag<Tuple<long, long>>();

        static void Main(string[] args)
        {
            long numThreads = 24;
            s.Start();

            for (int i = 0; i < numThreads; ++i)
            {
                AutoResetEvent ares = new AutoResetEvent(false);
                ReadLastTimestampAndPublish spinner = new ReadLastTimestampAndPublish(
                    new AutoResetEvent(false));
                Task.Factory.StartNew(spinner.Spin, TaskCreationOptions.LongRunning);
                Publishers.Add(spinner);
            }

            for (int i = 0; i < 20; ++i)
            {
                ++Iteration;
                LastTimestamp = s.Elapsed.Ticks;
                MRES.Set();
                Thread.Sleep(500);
                MRES.Reset();
                foreach (ReadLastTimestampAndPublish publisher in Publishers)
                {
                    publisher.ARES.Set();
                }
                Thread.Sleep(500);
            }

            Console.WriteLine(String.Join("\n",
                from n in IterationToTicks where n.Item1 == 10 orderby n.Item2
                    select ((decimal)n.Item2) / TimeSpan.TicksPerMillisecond));
            Console.Read();
        }

        class ReadLastTimestampAndPublish
        {
            public AutoResetEvent ARES { get; private set; }

            public ReadLastTimestampAndPublish(AutoResetEvent ares)
            {
                this.ARES = ares;
            }

            public void Spin()
            {
                while (true)
                {
                    MRES.Wait();
                    IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
                    ARES.WaitOne();
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Mic*_*tin 1

这些版本之间的一个区别是,在 PulseAll 情况下 - 线程立即重复循环,再次锁定对象。

你有12个核心,所以有12个线程正在运行,执行循环,然后再次进入循环,锁定对象(一个接一个),然后进入等待状态。其他线程一直在等待。在 ManualEvent 情况下,您有两个事件,因此线程不会立即重复循环,而是在 ARES 事件上被阻塞 - 这允许其他线程更快地获取锁所有权。

我通过在 ReadLastTimestampAndPublish 的循环末尾添加睡眠来模拟 PulseAll 中的类似行为。这使得其他线程可以更快地锁定syncObj,并且似乎提高了我从程序中获得的数字。

static void ReadLastTimestampAndPublish()
{
    while(true)
    {
        lock(SyncObj)
        {
            Monitor.Wait(SyncObj);
        }
        IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
        Thread.Sleep(TimeSpan.FromMilliseconds(100));   // <===
    }
}
Run Code Online (Sandbox Code Playgroud)