使用线程处理队列的最有效方法

Mat*_*rts 15 c# multithreading task-parallel-library c#-4.0

我有一个队列,其上放置了待处理的傅立叶变换请求(相对耗时的操作) - 在某些情况下,我们每秒可以获得数千个变换请求,因此它必须快速.

我正在升级旧代码以使用.net 4,以及移植到TPL.我想知道处理这个队列的最有效(最快的吞吐量)方式是什么样的.我想使用所有可用的核心.

目前我正在尝试使用BlockingCollection.我创建了一个队列处理程序类,它产生4个任务,阻塞BlockingCollection并等待传入​​的工作.然后他们处理挂起的转换.码:

public class IncomingPacketQueue : IDisposable
    {
        BlockingCollection<IncomingPacket> _packetQ = new BlockingCollection<IncomingPacket>();

        public IncomingPacketQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public void EnqueueSweep(IncomingPacket incoming)
        {
            _packetQ.Add(incoming);
        }

        private void Consume()
        {
            foreach (var sweep in _packetQ.GetConsumingEnumerable())
            {
                //do stuff
                var worker = new IfftWorker();
                Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                worker.DoIfft(sweep);                

            }
        }

        public int QueueCount
        {
            get
            {
                return _packetQ.Count;
            }
        }

    #region IDisposable Members

    public void Dispose()
    {
        _packetQ.CompleteAdding();
    }

    #endregion
    }
Run Code Online (Sandbox Code Playgroud)

这看起来是一个很好的解决方案吗?它似乎最大化了所有核心 - 虽然我目前不确定我应该在我的构造函数中产生多少工人.

Jim*_*hel 7

这看起来很合理.我发现BlockingCollection它很快.我用它来处理每秒数万个请求.

如果您的应用程序受处理器限制,那么您可能不希望创建比核心更多的工作者.当然,您不希望创建比核心更多的工作者.在四核机器上,如果您希望大部分时间都花费在FFT上,那么四个工作人员将占用所有CPU.更多的工作者只是意味着你有更多的线程上下文切换来处理.TPL通常会为您平衡这一点,但是当您无法处理超过少数人时,没有理由创建100名工作人员.

我建议您使用3,4,5,6,7和8名工人进行测试.看看哪一个给你最好的吞吐量.