C#阻塞FIFO队列可以泄漏消息吗?

usr*_*ΛΩΝ 2 c# queue concurrency fifo interlocked-increment

我正在开发一个学术开源项目,现在我需要在C#中创建一个快速阻塞FIFO队列.我的第一个实现只是在读者的信号量中包含一个同步队列(带有动态扩展),然后我决定以下面的方式重新实现(理论上更快)

public class FastFifoQueue<T>
{
    private T[] _array;
    private int _head, _tail, _count;
    private readonly int _capacity;
    private readonly Semaphore _readSema, _writeSema;

    /// <summary>
    /// Initializes FastFifoQueue with the specified capacity
    /// </summary>
    /// <param name="size">Maximum number of elements to store</param>
    public FastFifoQueue(int size)
    {
        //Check if size is power of 2
        //Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
        if ((size & (size - 1)) != 0)
            throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");

        _capacity = size;
        _array = new T[size];
        _count = 0;
        _head = int.MinValue; //0 is the same!
        _tail = int.MinValue;

        _readSema = new Semaphore(0, _capacity);
        _writeSema = new Semaphore(_capacity, _capacity);
    }

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        Interlocked.Exchange(ref _array[index], item);
        Interlocked.Increment(ref _count);
        _readSema.Release();
    }

    public T Dequeue()
    {
        _readSema.WaitOne();
        int index = Interlocked.Increment(ref _tail);
        index %= _capacity;
        if (index < 0) index += _capacity;
        T ret = Interlocked.Exchange(ref _array[index], null);
        Interlocked.Decrement(ref _count);
        _writeSema.Release();

        return ret;
    }

    public int Count
    {
        get
        {
            return _count;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我们在教科书上找到的静态数组的经典FIFO队列实现.它被设计成以原子方式递增指针,并且因为当达到(capacity-1)时我无法使指针返回到零,所以我计算模数.理论上,使用Interlocked与执行增量之前的锁定相同,并且由于存在信号量,因此多个生产者/消费者可以进入队列,但一次只能有一个能够修改队列指针.首先,因为Interlocked.Increment首先递增,然后返回,我已经明白我只能使用后增量值并从数组中的位置1开始存储项目.这不是问题,当我达到一定值时,我会回到0

有什么问题呢?您不会相信,在重负载下运行,有时队列会返回NULL值.我确定,重复一遍,我确定,没有方法将null排入队列.这绝对是正确的,因为我试图在Enqueue中进行空检查以确保,并且没有抛出任何错误.我用Visual Studio创建了一个测试用例(顺便说一句,我使用像maaaaaaaany人这样的双核CPU)

    private int _errors;

    [TestMethod()]
    public void ConcurrencyTest()
    {
        const int size = 3; //Perform more tests changing it
        _errors = 0;
        IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
        Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
        Thread[] producers = new Thread[size], consumers = new Thread[size];

        for (int i = 0; i < size; i++)
        {
            producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
            consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
            producers[i].Start(queue);
            consumers[i].Start(queue);
        }

        Thread.Sleep(new TimeSpan(0, 0, 1, 0));

        for (int i = 0; i < size; i++)
        {
            producers[i].Abort();
            consumers[i].Abort();
        }

        Assert.AreEqual(0, _errors);
    }

    private void LoopProducer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                try
                {
                    q.Enqueue(new object());
                }
                catch
                { }

            }
        }
        catch (ThreadAbortException)
        { }
    }

    private void LoopConsumer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                object item = q.Dequeue();
                if (item == null) Interlocked.Increment(ref _errors);
            }
        }
        catch (ThreadAbortException)
        { }

    }
Run Code Online (Sandbox Code Playgroud)

一旦消费者线程获得null,就会计算错误.当使用1个生产者和1个消费者进行测试时,它会成功.当对2个生产者和2个或更多消费者进行测试时,会发生灾难:甚至检测到2000个泄漏.我发现问题可能出在Enqueue方法中.通过设计契约,生产者只能写入一个空的单元格(null),但是通过一些诊断修改我的代码我发现有时生产者试图在非空单元格上写入,然后被"好"所占据"数据.

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        T leak = Interlocked.Exchange(ref _array[index], item);

        //Diagnostic code
        if (leak != null)
        {
            throw new InvalidOperationException("Too bad...");
        }
        Interlocked.Increment(ref _count);

        _readSema.Release();
    }
Run Code Online (Sandbox Code Playgroud)

然后经常发生"太糟糕"的异常.但是从并发写入引发冲突太奇怪了,因为增量是原子的,而编写器的信号量只允许与自由数组单元一样多的编写器.

有人可以帮我吗?如果您与我分享您的技能和经验,我将非常感激.

谢谢.

Dan*_*Tao 6

我必须说,这让我觉得这是一个非常聪明的主意,在我开始意识到这里的错误(我认为)之前,我想了一会儿.所以,一方面,我们想出这样一个聪明的设计!但是,与此同时,对你展示" Kernighan定律" 感到羞耻:

调试的难度是首先编写代码的两倍.因此,如果您尽可能巧妙地编写代码,那么根据定义,您不够聪明,无法对其进行调试.

这个问题基本上是这样的:你的假设WaitOneRelease调用有效序列化所有的EnqueueDequeue操作; 但这并不是这里发生的事情.请记住,Semaphore该类用于限制访问资源的线程数,而不是确保特定的事件顺序.会发生什么之间的每个WaitOneRelease不能保证在同一个"线程级"发生的WaitOneRelease要求自己.

用文字解释这很棘手,所以让我试着提供一个视觉插图.

假设您的队列容量为8,看起来像这样(让我们0表示nullx代表一个对象):

[ x x x x x x x x ]

因此Enqueue被调用了8次并且队列已满.因此,您的_writeSema信号量将被阻止WaitOne,您的_readSema信号量将立即返回WaitOne.

现在让我们假设Dequeue在3个不同的线程上或多或少地同时调用它们.我们称之为T1,T2和T3.

在继续之前,让我为您的Dequeue实施应用一些标签,以供参考:

public T Dequeue()
{
    _readSema.WaitOne();                                   // A
    int index = Interlocked.Increment(ref _tail);          // B
    index %= _capacity;
    if (index < 0) index += _capacity;
    T ret = Interlocked.Exchange(ref _array[index], null); // C
    Interlocked.Decrement(ref _count);
    _writeSema.Release();                                  // D

    return ret;
}
Run Code Online (Sandbox Code Playgroud)

好的,所以T1,T2和T3都已经过了A点.然后为了简单起见,我们假设它们各自按顺序到达线B,因此T1的a index为0,T2的a index为1,T3的a index为2.

到现在为止还挺好.但是这里有问题:无法保证从这里开始,T1,T2和T3将以任何指定的顺序到达D.假设T3实际上超过了T1和T2,超过了C线(从而设置_array[2]null)并一直到达D线.

在此之后,_writeSema将发出信号,这意味着您的队列中有一个可用的插槽可以写入,对吗?但你的队列现在看起来像这样!

[ x x 0 x x x x x ]

因此,如果另一个线程同时出现调用Enqueue,它实际上将过去 _writeSema.WaitOne,递增_head并获得index0,即使插槽0不为空.这样做的结果是插槽0中的项目实际上可以被覆盖,在T1之前(还记得吗?)读取它.

要了解您的null值来自何处,您只需要可视化我刚才描述的过程的反向.也就是说,假设您的队列如下所示:

[ 0 0 0 0 0 0 0 0 ]

三个线程,T1,T2和T3,Enqueue几乎同时调用.T3 _head 最后增加,但插入其项目(at _array[2])并_readSema.Release 首先调用,导致发出信号,_readSema但队列看起来像:

[ 0 0 x 0 0 0 0 0 ]

因此,如果另一个线程同时调用Dequeue(在T1和T2完成它们的操作之前),它将过去_readSema.WaitOne,递增_tail并获得index0,即使插槽0 .

所以这是你的问题.至于解决方案,我目前没有任何建议.给我一些时间考虑一下......(我现在正在发布这个答案,因为它在我的脑海中很新鲜,我觉得它可能对你有帮助.)