当有大量消息排队时,从MSMQ读取速度会变慢

Man*_*lin 8 .net c# msmq message-queue

简短介绍

我有一个基于SEDA的系统,并使用MSMQ在不同的应用程序/服务之间进行通信(事件触发).

其中一个服务按文件获取消息,因此我有一个文件监听器,它读取文件内容并将其插入队列(或实际上是4个不同的队列,但这对第一个问题不是很重要).

服务器是Windows Server 2008

第一个问题 - 阅读速度慢下来

我在另一端读取这些消息的应用程序通常每秒从队列中读取大约20条消息,但是当发布消息的服务开始排队数千条消息时,读取​​将关闭,并且读取应用程序仅读取2-4条消息第二.当没有发布到队列时,读取的应用程序可以再次读取每秒最多20条消息.

阅读应用程序中的代码非常简单,在C#中开发,我在System.Messaging中使用Read(TimeSpan超时)函数.

问:为什么在队列中发布大量消息时读取速度会变慢?

第二个问题 - TPS的局限性

另一个问题是阅读本身.如果我使用1或5个线程从队列中读取,那么每秒可以读取的消息数似乎没有区别.我也尝试实现一个"循环解决方案",其中邮政服务发布到一组随机的4个队列,并且读取的应用程序有一个线程监听这些队列中的每一个,但即使我仍然只有20个TPS从1个队列中读取1个线程,1个队列,4个线程或4个队列(每个队列一个线程).

我知道线程中的处理大约需要50毫秒,所以如果当时只处理一条消息,那么20 TPS是完全正确的,但多线程的线索应该是并行处理消息而不是顺序消息.

服务器上有大约110个不同的队列.

问:为什么即使使用多线程和使用多个队列,我也无法在队列中收到超过20条消息?

这是今天运行的代码:

// There are 4 BackgroundWorkers running this function
void bw_DoWork(object sender, DoWorkEventArgs e) 
{
    using(var mq = new MessageQueue(".\\content"))
    {
        mq.Formatter = new BinaryMessageFormatter();

        // ShouldIRun is a bool set to false by OnStop()
        while(ShouldIRun)
        {
            try
            {
                using(var msg = mq.Receive(new TimeSpan(0,0,2))
                {
                    ProcessMessageBody(msg.Body); // This takes 50 ms to complete
                }
            }
            catch(MessageQueueException mqe)
            {
               // This occurs every time TimeSpan in Receive() is reached
               if(mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) 
                   continue;
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

但即使有4个线程,似乎所有人都等待该函数再次进入"接收"点.我也尝试使用4个不同的队列(content1,content2,content3和content4),但我仍然每50毫秒处理1条消息.

这与Receive()中的TimeSpan有什么关系,和/或是否可以省略它?

另一个问题是如果使用私有队列,in​​stad公共会解决什么问题?

Ken*_*enL 2

你为什么使用时间跨度?- 这是一件坏事,原因如下。

在开发服务和队列时,您需要以广告安全的方式进行编程。队列中的每个项目都会产生一个新线程。使用时间跨度会强制每个线程使用单个计时器事件线程。这些事件必须在事件线程中等待轮到它们。

标准是每个队列事件 1 个线程 - 这通常是您的 System.Messaging.ReceiveCompletedEventArgs 事件。另一个线程是您的 onStart 事件...

20 个线程或每秒 20 次读取可能是正确的。通常,在 .net 中,线程池一次只能生成 36 个线程。

我的建议是删除计时器事件,让您的队列只处理数据。

做一些类似这样的事情;

namespace MessageService 
{ 

public partial class MessageService : ServiceBase 

{ 

    public MessageService() 

    { 

        InitializeComponent(); 

    } 



    private string MessageDirectory = ConfigurationManager.AppSettings["MessageDirectory"]; 

    private string MessageQueue = ConfigurationManager.AppSettings["MessageQueue"]; 



    private System.Messaging.MessageQueue messageQueue = null; 



    private ManualResetEvent manualResetEvent = new ManualResetEvent(true); 





    protected override void OnStart(string[] args) 

    { 

        // Create directories if needed 

        if (!System.IO.Directory.Exists(MessageDirectory)) 

            System.IO.Directory.CreateDirectory(MessageDirectory); 



        // Create new message queue instance 

        messageQueue = new System.Messaging.MessageQueue(MessageQueue); 



        try 

        {    

            // Set formatter to allow ASCII text 

            messageQueue.Formatter = new System.Messaging.ActiveXMessageFormatter(); 

            // Assign event handler when message is received 

            messageQueue.ReceiveCompleted += 

                new System.Messaging.ReceiveCompletedEventHandler(messageQueue_ReceiveCompleted); 

            // Start listening 



            messageQueue.BeginReceive(); 

        } 

        catch (Exception e) 

        { 



        } 

    } 



    protected override void OnStop() 

    { 

        //Make process synchronous before closing the queue 

        manualResetEvent.WaitOne(); 





        // Clean up 

        if (this.messageQueue != null) 

        { 

            this.messageQueue.Close(); 

            this.messageQueue = null; 

        } 

    } 



    public void messageQueue_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e) 

    { 

        manualResetEvent.Reset(); 

        System.Messaging.Message completeMessage = null; 

        System.IO.FileStream fileStream = null; 

        System.IO.StreamWriter streamWriter = null; 

        string fileName = null; 

        byte[] bytes = new byte[2500000]; 

        string xmlstr = string.Empty;                

            try 

            { 

                // Receive the message 

                completeMessage = this.messageQueue.EndReceive(e.AsyncResult);                    

                completeMessage.BodyStream.Read(bytes, 0, bytes.Length); 



                System.Text.ASCIIEncoding ascii = new System.Text.ASCIIEncoding(); 



                long len = completeMessage.BodyStream.Length; 

                int intlen = Convert.ToInt32(len);                   

                xmlstr = ascii.GetString(bytes, 0, intlen);                   

            } 

            catch (Exception ex0) 

            { 

                //Error converting message to string                    

            } 

        }
Run Code Online (Sandbox Code Playgroud)

  • 您是否可以参考该声明的一些文档 - “为什么要使用时间跨度? - 这是一件坏事,这就是原因。” 快速查找了一些 doco,但找不到任何东西。 (3认同)