如何使用BlockingCollection <>解决生产者/消费者竞争条件

Rob*_*los 4 .net c# multithreading thread-safety

我正在实现一个记录器,它将记录写入数据库.为了防止数据库写入阻塞调用记录器的代码,我已经将数据库访问移动到一个单独的线程,使用基于生产者/消费者模型实现BlockingCollection<string>.

这是简化的实现:

abstract class DbLogger : TraceListener
{
    private readonly BlockingCollection<string> _buffer;
    private readonly Task _writerTask;

    DbLogger() 
    {
        this._buffer = new BlockingCollection<string>(new ConcurrentQueue<string>(), 1000);
        this._writerTask = Task.Factory.StartNew(this.ProcessBuffer, TaskCreationOptions.LongRunning);
    }

    // Enqueue the msg.
    public void LogMessage(string msg) { this._buffer.Add(msg); }

    private void ProcessBuffer()
    {
        foreach (string msg in this._buffer.GetConsumingEnumerable())
        {
            this.WriteToDb(msg);
        }
    }

    protected abstract void WriteToDb(string msg);

    protected override void Dispose(bool disposing) 
    { 
        if (disposing) 
        {
            // Signal to the blocking collection that the enumerator is done.
            this._buffer.CompleteAdding();

            // Wait for any in-progress writes to finish.
            this._writerTask.Wait(timeout);

            this._buffer.Dispose(); 
        }
        base.Dispose(disposing); 
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,当我的应用程序关闭时,我需要确保在数据库连接断开之前刷新缓冲区.否则,WriteToDb将抛出异常.

所以,这是我天真的Flush实现:

public void Flush()
{
    // Sleep until the buffer is empty.
    while(this._buffer.Count > 0)
    {
        Thread.Sleep(50);
    }
}
Run Code Online (Sandbox Code Playgroud)

此实现的问题是以下事件序列:

  1. 缓冲区中有一个条目.
  2. 在记录线程,MoveNext()被称为在枚举,所以我们现在的身体是ProcessBufferforeach循环.
  3. Flush()由主线程调用.它看到该集合为空,因此立即返回.
  4. 主线程关闭数据库连接.
  5. 回到日志记录线程中,foreach循环体开始执行.WriteToDb被调用,因为数据库连接已关闭而失败.

所以,我的下一次尝试是添加一些标志,如下所示:

private volatile bool _isWritingBuffer = false;
private void ProcessBuffer()
{
    foreach (string msg in this._buffer.GetConsumingEnumerable())
    {
        lock (something) this._isWritingBuffer = true;
        this.WriteToDb(msg);
        lock (something) this._isWritingBuffer = false;
    }
}

public void Flush()
{
    // Sleep until the buffer is empty.
    bool isWritingBuffer;
    lock(something) isWritingBuffer = this._isWritingBuffer;
    while(this._buffer.Count > 0 || isWritingBuffer)
    {
        Thread.Sleep(50);
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,仍然存在竞争条件,因为整个Flush()方法可以在集合为空之后但在_isWritingBuffer设置之前执行true.

如何修复我的Flush实现以避免这种竞争条件?

注意:由于各种原因,我必须从头开始编写记录器,所以请不要回答我使用现有记录框架的建议.

Pra*_*eek 5

首先永远不要锁定公共对象,尤其是this.

此外,永远不要使用裸布线进行同步:如果您想了解可能出现的问题,请查看我的博客:同步,内存可见性和漏洞抽象 :)

关于这个问题本身我一定会遗漏一些东西,但为什么你需要这样的Flush方法呢?

实际上,当您完成日志记录后,您将通过Dispose从主线程调用其方法来处置记录器.

并且您已经以这样的方式实现它,它将等待"写入DB"任务.

如果我错了,你真的需要与另一个原语同步,那么你应该使用一个事件:

DbLogger:

public ManualResetEvent finalizing { get; set; }

public void Flush()
{
    finalizing.WaitOne();
}
Run Code Online (Sandbox Code Playgroud)

在某个地方,例如在ProcessBuffer你完成写入DB时通知你:

finalizing.Set();
Run Code Online (Sandbox Code Playgroud)