Tho*_*ard 2 c++ concurrency pthreads
我正在编写一个线程应用程序,它将处理一个资源列表,可能会也可能不会将结果项放在每个资源的容器(std :: map)中.资源处理在多个线程中进行.
结果容器将被遍历并且每个项目由一个单独的线程操作,该线程接受一个项目并更新MySQL数据库(使用mysqlcppconn API),然后从容器中移除该项目并继续.
为了简单起见,这里是逻辑的概述:
queueWorker() - thread
getResourcesList() - seeds the global queue
databaseWorker() - thread
commitProcessedResources() - commits results to a database every n seconds
processResources() - thread x <# of processor cores>
processResource()
queueResultItem()
Run Code Online (Sandbox Code Playgroud)
和伪实现来展示我正在做的事情.
/* not the actual stucts, but just for simplicities sake */
struct queue_item_t {
int id;
string hash;
string text;
};
struct result_item_t {
string hash; // hexadecimal sha1 digest
int state;
}
std::map< string, queue_item_t > queue;
std::map< string, result_item_t > results;
bool processResource (queue_item_t *item)
{
result_item_t result;
if (some_stuff_that_doesnt_apply_to_all_resources)
{
result.hash = item->hash;
result.state = 1;
/* PROBLEM IS HERE */
queueResultItem(result);
}
}
void commitProcessedResources ()
{
pthread_mutex_lock(&resultQueueMutex);
// this can take a while since there
for (std::map< string, result_item_t >::iterator it = results.begin; it != results.end();)
{
// do mysql stuff that takes a while
results.erase(it++);
}
pthread_mutex_unlock(&resultQueueMutex);
}
void queueResultItem (result_item_t result)
{
pthread_mutex_lock(&resultQueueMutex);
results.insert(make_pair(result.hash, result));
pthread_mutex_unlock(&resultQueueMutex);
}
Run Code Online (Sandbox Code Playgroud)
正如processResource()所示,问题在于,当commitProcessedResources()正在运行且resultQueueMutex被锁定时,我们将在此等待queueResultItem()返回,因为它将尝试锁定相同的互斥锁,因此将等待直到它完成,这可能需要一段时间.
显然,由于存在有限数量的线程运行,所以一旦所有线程都在等待queueResultItem()完成,在发布互斥锁并且可用于queueResultItem()之前,不会再做任何工作.
所以,我的问题是我最好如何实现这一点?是否存在可以同时插入和删除的特定类型的标准容器,或者是否存在我不知道的东西?
它不是严格必要的,每个队列项目可以有它自己独特的密钥在这里用的std ::地图是这样,但我宁愿它,因为很多资源,可以产生相同的结果,我宁愿只发送一个唯一的结果到数据库,即使它确实使用INSERT IGNORE忽略任何重复.
我不太熟悉C++,所以我不知道在谷歌上要找什么.:(
在处理期间,您不必一直保持队列的锁定commitProcessedResources ().您可以使用空的队列交换队列:
void commitProcessedResources ()
{
std::map< string, result_item_t > queue2;
pthread_mutex_lock(&resultQueueMutex);
// XXX Do a quick swap.
queue2.swap (results);
pthread_mutex_unlock(&resultQueueMutex);
// this can take a while since there
for (std::map< string, result_item_t >::iterator it = queue2.begin();
it != queue2.end();)
{
// do mysql stuff that takes a while
// XXX You do not need this.
//results.erase(it++);
}
}
Run Code Online (Sandbox Code Playgroud)