并发队列C#中的线程安全

skm*_*skm 6 c# multithreading

我有一个MessagesManager线程,不同的线程可以发送消息,然后这个 MessagesManager线程负责发布这些消息SendMessageToTcpIP()(MessagesManager线程的起点).

class MessagesManager : IMessageNotifier
{
    //private
    private readonly AutoResetEvent _waitTillMessageQueueEmptyARE = new AutoResetEvent(false);
    private ConcurrentQueue<string> MessagesQueue = new ConcurrentQueue<string>(); 

    public void PublishMessage(string Message)
    {
        MessagesQueue.Enqueue(Message);
        _waitTillMessageQueueEmptyARE.Set();
    }

    public void SendMessageToTcpIP()
    {
        //keep waiting till a new message comes
        while (MessagesQueue.Count() == 0)
        {
            _waitTillMessageQueueEmptyARE.WaitOne();
        }

        //Copy the Concurrent Queue into a local queue - keep dequeuing the item once it is inserts into the local Queue
        Queue<string> localMessagesQueue = new Queue<string>();

        while (!MessagesQueue.IsEmpty)
        {
            string message;
            bool isRemoved = MessagesQueue.TryDequeue(out message);
            if (isRemoved)
                localMessagesQueue.Enqueue(message);
        }

        //Use the Local Queue for further processing
        while (localMessagesQueue.Count() != 0)
        {
            TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
            Thread.Sleep(2000);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

不同的线程(3-4)通过调用PublishMessage(string Message)(使用相同的对象到MessageManager)发送它们的消息.消息到来后,我将该消息推送到并发队列SendMessageToTcpIP()通过设置通知 _waitTillMessageQueueEmptyARE.Set();.在内部SendMessageToTcpIP(),我从本地队列中的并发队列中复制消息,然后逐个发布.

问题:以这种方式进行入队和出队是否安全?可能会有一些奇怪的效果吗?

Evk*_*Evk 4

虽然这可能是线程安全的,但 .NET 中有内置类可以帮助实现“多个发布者一个消费者”模式,例如BlockingCollection. 你可以像这样重写你的类:

class MessagesManager : IDisposable {
    // note that your ConcurrentQueue is still in play, passed to constructor
    private readonly BlockingCollection<string> MessagesQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());

    public MessagesManager() {
        // start consumer thread here
        new Thread(SendLoop) {
            IsBackground = true
        }.Start();
    }

    public void PublishMessage(string Message) {
        // no need to notify here, will be done for you
        MessagesQueue.Add(Message);
    }

    private void SendLoop() {
        // this blocks until new items are available
        foreach (var item in MessagesQueue.GetConsumingEnumerable()) {
            // ensure that you handle exceptions here, or whole thing will break on exception
            TcpIpMessageSenderClient.ConnectAndSendMessage(item.PadRight(80, ' '));
            Thread.Sleep(2000); // only if you are sure this is required 
        }
    }

    public void Dispose() {            
        // this will "complete" GetConsumingEnumerable, so your thread will complete
        MessagesQueue.CompleteAdding();
        MessagesQueue.Dispose();
    }
}
Run Code Online (Sandbox Code Playgroud)