C# 线程处理消息队列

Her*_*rma 4 c# multithreading message-queue

我有以下要求 -

  1. 接收消息并将其排入队列的线程。
  2. 处理排队消息的线程。

现在,第二个线程必须始终处于活动状态 - 为此我使用了无限 while 循环,如下所示:

private AutoResetEvent messageReset;
private Queue<byte[]> messageQueue;

//thread 2 method
private void ProcessIncomingMessages()
{
 messageReset.WaitOne(); //wait for signal
 while(true)
 {
    if (messageQueue.Count > 0)
    {
        //processing messages
    }
  }
}
public void SubmitMessageForProcessing(byte[] message){
            messageQueue.Enqueue(message); //enqueue message

            // Release the thread
            messageReset.Set();
}
Run Code Online (Sandbox Code Playgroud)

现在,这个无限 while 循环使 CPU 利用率非常高。有没有什么办法可以降低CPU利用率

注意:我无法添加任何 thread.sleep 语句,因为传入消息将以最小的延迟显示在 UI 上。

Fre*_*gar 5

只需使用 aBlockingCollection代替Queue. 它是线程安全的,并且会一直阻塞,Take直到某个工作人员添加一个项目:

// Use default constructor to make BlockingCollection FIFO
private BlockingCollection<byte[]> messageQueue = new BlockingCollection<byte[]>();

//thread 2 method
private void ProcessIncomingMessages()
{
    while (true)
    {
        //will block until thread1 Adds a message
        byte[] message = messageQueue.Take();

        //processing messages
    }
}
public void SubmitMessageForProcessing(byte[] message)
{
    messageQueue.Add(message); //enqueue message
}
Run Code Online (Sandbox Code Playgroud)

EDIT2:我忘了提及,通过使用默认构造函数 BlockingCollection将是 FIFO。它实际上会使用一个ConcurrentQueue作为项目容器。

如果你想BlockingCollection表现得像 LIFO 集合,你需要将IProducerConsumerCollectionLIFO 传递给构造函数。通常的课程是ConcurrentStack


编辑:一些关于如何Queue不是线程安全的解释,这可能会导致当前代码出现问题。

来自 Microsoft 文档Queue

只要集合不被修改,队列就可以同时支持多个读取器。

这意味着您无法同时从多个线程读取和写入。

请看以下示例,该示例也适用于建议仅搬入messageReset.WaitOne()您的while(true)街区的其他答案。

  1. SubmitMessageForProcessing被呼叫并发出信号messageReset.Set()
  2. 线程 2 变得活动并尝试读取数据。
  3. 当线程2读取数据时SubmitMessageForProcessing被第二次调用。
  4. 现在您同时进行写入和读取,导致意外行为(通常是某种异常)