Threadsafe FIFO队列/缓冲区

use*_*560 8 c# queue multithreading buffer .net-3.5

我需要实现一种任务缓冲区.基本要求是:

  • 在单个后台线程中处理任务
  • 从多个线程接收任务
  • 处理所有收到的任务,即确保在收到停止信号后缓冲区的缓冲任务耗尽
  • 必须保持每个线程接收的任务顺序

我正在考虑使用如下的队列来实现它.希望得到有关实施的反馈.还有其他更明智的想法来实现这样的事情吗?

public class TestBuffer
{
    private readonly object queueLock = new object();
    private Queue<Task> queue = new Queue<Task>();
    private bool running = false;

    public TestBuffer()
    {
    }

    public void start()
    {
        Thread t = new Thread(new ThreadStart(run));
        t.Start();
    }

    private void run()
    {
        running = true;

        bool run = true;
        while(run)
        {
            Task task = null;
            // Lock queue before doing anything
            lock (queueLock)
            {
                // If the queue is currently empty and it is still running
                // we need to wait until we're told something changed
                if (queue.Count == 0 && running)
                {
                    Monitor.Wait(queueLock);
                }

                // Check there is something in the queue
                // Note - there might not be anything in the queue if we were waiting for something to change and the queue was stopped
                if (queue.Count > 0)
                {
                    task = queue.Dequeue();
                }
            }

            // If something was dequeued, handle it
            if (task != null)
            {
                handle(task);
            }

            // Lock the queue again and check whether we need to run again
            // Note - Make sure we drain the queue even if we are told to stop before it is emtpy
            lock (queueLock)
            {
                run = queue.Count > 0 || running;
            }
        }
    }

    public void enqueue(Task toEnqueue)
    {
        lock (queueLock)
        {
            queue.Enqueue(toEnqueue);
            Monitor.PulseAll(queueLock);
        }
    }

    public void stop()
    {
        lock (queueLock)
        {
            running = false;
            Monitor.PulseAll(queueLock);
        }
    }

    public void handle(Task dequeued)
    {
        dequeued.execute();
    }
}
Run Code Online (Sandbox Code Playgroud)

Eri*_* J. 7

您可以使用开箱即用的BlockingCollection实际处理此问题.

它旨在拥有一个或多个生产者,以及一个或多个消费者.在您的情况下,您将拥有多个生产者和一个消费者.

当您收到停止信号时,请使用该信号处理程序

  • 信号生成器线程停止
  • 在BlockingCollection实例上调用CompleteAdding

消费者线程将继续运行,直到删除并处理所有排队的项目,然后它将遇到BlockingCollection完成的条件.当线程遇到该条件时,它就会退出.


Qer*_*rts 5

实际上,您应该考虑ConcurrentQueue,它是FIFO。如果不合适,请在Thread-Safe Collections中尝试其一些亲属。使用这些可以避免一些风险。