线程构建模块parallel_bounded_queue —如何“关闭”它?

Mis*_*iev 3 queue multithreading tbb

我正在使用concurrent_bounded_queueIntel TBB 4.1 Update 3在生产者线程和使用者线程之间进行通信:

队列类有一个叫做方法abort,其抛出tbb::user_abort到上阻塞所有线程poppush队列实例的。两个线程之间的通信可能如下所示:

ConsThread | ProdThread
-----------+-------------
q.pop      |  get new data
(wait)     |  q.push
process    |  get new data
q.pop      |  no more data!
(wait)     |  q.abort
quit       |  quit
Run Code Online (Sandbox Code Playgroud)

不幸的是,即使在这个简单的示例中,我也不能使用它来可靠地关闭队列,因为如果某些使用者pop在调用之前没有完成对先前ped数据的处理abort,他们将完成迭代并返回阻塞pop

ConsThread | ProdThread
-----------+-------------
q.pop      |  get new data
(wait)     |  q.push
process    |  get new data
process    |  no more data!
process    |  q.abort
process    |  quit
process    |
q.pop      |
(wait)     |
(wait)     |
(wait)     |
(so lonely)|
Run Code Online (Sandbox Code Playgroud)

现在,我正在使用一个中等程度的令人讨厌的hack,它会产生另一个未分离的线程(该线程加入了使用者池线程),并等待它完成,同时abort不时向后继者发送更多s:

bool areConsumerThreadsJoinedThankYou = false;
std::thread joiner(Joiner(consumerPool, &areConsumerThreadsJoinedThankYou));

while (!areConsumerThreadsJoinedThankYou) {
    rawQueue.abort();
    MAGIC_MSLEEP(100);
}
Run Code Online (Sandbox Code Playgroud)

class Joiner的实现几乎是

void Joiner::operator()(void)
{
    for (auto it = this->m_threadPool.begin();
         it < this->m_threadPool.end();
         it++)
        (*it)->join();
    this->m_done = true;
    *(this->m_flag) = true;
}
Run Code Online (Sandbox Code Playgroud)

这当然很丑。还有更根本的解决方案吗?

Arc*_*son 5

创建一个指定的“ EndOfData”项。如果您知道有K个使用者,则在完成推送数据项后,让生产者推送K个“ EndOfData”项。让每个使用者在弹出“ EndOfData”项后退出。

如果事先不知道K,请让生产者推送单个“ EndOfData”项。然后让每个弹出“ EndOfData”项目的消费者在离开之前推送另一个“ EndOfData”项目。完成所有使用者后,将剩下一个“ EndOfData”项,该项将在销毁队列时销毁。