生产者完成后通知消费者的优雅方式?

Naw*_*waz 5 c++ queue concurrency multithreading producer-consumer

我正在实现一个concurrent_blocking_queue具有最小功能:

//a thin wrapper over std::queue
template<typename T>
class concurrent_blocking_queue
{
    std::queue<T> m_internal_queue;
     //...
 public:
     void add(T const & item);
     T&   remove();
     bool empty();
};
Run Code Online (Sandbox Code Playgroud)

我打算将它用于生产者 - 消费者问题(我猜,它是使用这种数据结构的地方吗?).但我坚持一个问题是:

生产者完成后如何优雅地通知消费者?生产者完成后如何通知队列?通过调用特定的成员函数,比方说done()?从队列中抛出异常(即从remove函数中)是一个好主意吗?

我遇到了很多例子,但都有无限循环,好像生产者会永远生产物品.没有讨论停止条件的问题,甚至没有讨论维基文章.

fox*_*337 5

我过去简单介绍了一个虚拟的"完成"产品.因此,如果生产者可以创建类型A和类型B的"产品",我发明了"完成"类型.当消费者遇到"完成"类型的产品时,它知道不再需要进一步处理.


Jam*_*nze 1

我的队列通常使用指针(std::auto_ptr在接口中使用 ,以明确指示发送者可能不再访问指针);在大多数情况下,排队的对象是多态的,因此无论如何都需要动态分配和引用语义。否则,向队列添加“文件结束”标志应该不会太困难。您需要在生产者端有一个特殊的函数(close?)来设置它(使用与写入队列时完全相同的锁定原语),并且删除函数中的循环必须等待某个东西出现,或者要关闭的队列。当然,你需要返回一个Fallible值,以便读者知道读取是否成功。另外,不要忘记在这种情况下,您需要一个notify_all确保所有等待该条件的进程都被唤醒。

顺便说一句:我不太明白你的界面是如何实现的。T&返回的 byremove指的是什么。基本上,remove必须是这样的:

Fallible<T>
MessageQueue<T>::receive()
{
    ScopedLock l( myMutex );
    while ( myQueue.empty() && ! myIsDone )
        myCondition.wait( myMutex );
    Fallible<T> results;
    if ( !myQueue.empty() ) {
        results.validate( myQueue.top() );
        myQueue.pop();
    }
    return results;
}
Run Code Online (Sandbox Code Playgroud)

即使没有myIsDone条件,您也必须先将值读入局部变量,然后再将其从队列中删除,并且无法返回对局部变量的引用。

对于其余的:

void
MessageQueue<T>::send( T const& newValue )
{
    ScopedLock l( myMutex );
    myQueue.push( newValue );
    myCondition.notify_all();
}

void
MessageQueue<T>::close()
{
    ScopedLock l( myMutex );
    myIsDone = true;
    myCondition.notify_all();
}
Run Code Online (Sandbox Code Playgroud)