std :: map insert/erase的并发问题

Tho*_*ard 2 c++ concurrency pthreads

我正在编写一个线程应用程序,它将处理一个资源列表,可能会也可能不会将结果项放在每个资源的容器(std :: map)中.资源处理在多个线程中进行.

结果容器将被遍历并且每个项目由一个单独的线程操作,该线程接受一个项目并更新MySQL数据库(使用mysqlcppconn API),然后从容器中移除该项目并继续.

为了简单起见,这里是逻辑的概述:

queueWorker() - thread
    getResourcesList() - seeds the global queue

databaseWorker() - thread
    commitProcessedResources() - commits results to a database every n seconds

processResources() - thread x <# of processor cores>
    processResource()
    queueResultItem()
Run Code Online (Sandbox Code Playgroud)

和伪实现来展示我正在做的事情.

/* not the actual stucts, but just for simplicities sake */
struct queue_item_t {
    int id;
    string hash;
    string text;
};

struct result_item_t {
    string hash; // hexadecimal sha1 digest
    int state;
}

std::map< string, queue_item_t > queue;
std::map< string, result_item_t > results;

bool processResource (queue_item_t *item)
{
    result_item_t result;

    if (some_stuff_that_doesnt_apply_to_all_resources)
    {
        result.hash = item->hash;
        result.state = 1;

        /* PROBLEM IS HERE */
        queueResultItem(result);
    }
}

void commitProcessedResources ()
{
    pthread_mutex_lock(&resultQueueMutex);

    // this can take a while since there

    for (std::map< string, result_item_t >::iterator it = results.begin; it != results.end();)
    {
        // do mysql stuff that takes a while

        results.erase(it++);
    }

    pthread_mutex_unlock(&resultQueueMutex);
}

void queueResultItem (result_item_t result)
{
    pthread_mutex_lock(&resultQueueMutex);

    results.insert(make_pair(result.hash, result));

    pthread_mutex_unlock(&resultQueueMutex);
}
Run Code Online (Sandbox Code Playgroud)

正如processResource()所示,问题在于,当commitProcessedResources()正在运行且resultQueueMutex被锁定时,我们将在此等待queueResultItem()返回,因为它将尝试锁定相同的互斥锁,因此将等待直到它完成,这可能需要一段时间.

显然,由于存在有限数量的线程运行,所以一旦所有线程都在等待queueResultItem()完成,在发布互斥锁并且可用于queueResultItem()之前,不会再做任何工作.

所以,我的问题是我最好如何实现这一点?是否存在可以同时插入和删除的特定类型的标准容器,或者是否存在我不知道的东西?

它不是严格必要的,每个队列项目可以有它自己独特的密钥在这里用的std ::地图是这样,但我宁愿它,因为很多资源,可以产生相同的结果,我宁愿只发送一个唯一的结果到数据库,即使它确实使用INSERT IGNORE忽略任何重复.

我不太熟悉C++,所以我不知道在谷歌上要找什么.:(

wil*_*ilx 7

在处理期间,您不必一直保持队列的锁定commitProcessedResources ().您可以使用空的队列交换队列:

void commitProcessedResources ()
{
    std::map< string, result_item_t > queue2;
    pthread_mutex_lock(&resultQueueMutex);
    // XXX Do a quick swap.
    queue2.swap (results);
    pthread_mutex_unlock(&resultQueueMutex);

    // this can take a while since there

    for (std::map< string, result_item_t >::iterator it = queue2.begin();
        it != queue2.end();)
    {
        // do mysql stuff that takes a while

        // XXX You do not need this.
        //results.erase(it++);
    }   
}
Run Code Online (Sandbox Code Playgroud)