C#生产者/消费者

lbo*_*ard 26 c# design-patterns monitor producer-consumer

我最近遇到了生产者/消费者模式c#实现.它非常简单,(至少对我来说)非常优雅.

它似乎是在2006年左右设计的,所以我想知道这种实施是否
安全
- 仍然适用

代码如下(原始代码参考http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Jon*_*eet 33

代码比那个旧 - 我在.NET 2.0问世之前写了一段时间.该概念生产者/消费者队列的方式相比,虽然旧的:)

是的,就我所知,该代码是安全的 - 但它有一些不足之处:

  • 它不是通用的.现代版本肯定是通用的.
  • 它无法阻止队列.停止队列的一种简单方法(使所有消费者线程退出)是具有可以放入队列的"停止工作"令牌.然后,您可以添加与线程一样多的令牌.或者,您有一个单独的标志,表示您要停止.(这允许其他线程在完成队列中的所有当前工作之前停止.)
  • 如果工作量很小,一次只能完成一项工作可能不是最有效的工作.

诚实地说,代码背后的想法比代码本身更重要.


das*_*ton 28

您可以执行类似以下代码段的操作.它是通用的,并且有一个方法可以将空值(或者你想使用的任何标志)排入队列,以告诉工作线程退出.

代码来自这里:http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 这是迄今为止生产者消费模式的最佳实施.我最近在我的多线程应用程序中使用了它,并且即使在1000-1500个线程上也能正常工作. (3认同)
  • @Inactivist你可以将一个Action <T>添加到ctor,保存为字段_action,并在你执行Dequeue()之后调用它,如下所示:_action(task); (2认同)

小智 17

回到那一天,我从上面的代码片段和它的文章系列中学习了Monitor.Wait/Pulse如何工作(以及一般的线程).正如乔恩所说,它具有很大的价值,确实安全可行.

但是,从.NET 4开始,框架中有一个生产者 - 消费者队列实现.我只是自己发现它,但到目前为止它完成了我需要的一切.