在unordered_map(C++)中并发写入不同的桶?

Tom*_*Tom 20 c++ concurrency multithreading c++11

C++新手在这里.我正在尝试在unordered_map中同时写入不同的存储桶.从我通过搜索可以看出,我的理解是这应该是一个线程安全的操作.我(也许是不正确的)理解是基于这里这里的答案,以及C++ 11标准的引用部分(特别是第2项 - 强调我的):

23.2.2集装箱数据竞赛[container.requirements.dataraces]

1为避免数据争用(17.6.5.9),实现应考虑以下函数为const:begin,end,rbegin,rend,front,back,data,find,lower_bound,upper_bound,equal_range,at和,除了关联或无序关联容器,operator [].

2尽管如此(17.6.5.9),除了vector<bool>同时修改同一序列中不同元素中包含对象的内容时,还需要实现以避免数据争用.

3 [注意:对于大小大于1的向量x,x [1] = 5和*x.begin()= 10可以在没有数据争用的情况下同时执行,但是x [0] = 5和*x.并发执行的begin()= 10可能导致数据竞争.作为一般规则的例外,对于向量<bool> y,y [0] = true可以与y [1] = true竞争. - 尾注]

在任何情况下,使用标准容器写入不同的桶似乎不是线程安全的,如下面的代码所示.您会看到我在写入之前启用了与正在修改的存储区相对应的锁定,但有时无法正确记录对.对于它的价值,如果我使用单个锁 - 例如,只需更改auto bkt = mm->bucket(key);auto bkt=0;,有效地锁定整个unordered_map容器 - 一切都按预期工作.

#include <iostream>
#include <unordered_map>
#include <atomic>
#include <vector>
#include <thread>

#define NUM_LOCKS 409
#define N 100
#define NUM_THREADS 2

using namespace std;


class SpinLock
{
    public:
        void lock()
        {
            while(lck.test_and_set(memory_order_acquire)){}
        }
    void unlock()
        {
            lck.clear(memory_order_release);
        }

    private:
        atomic_flag lck = ATOMIC_FLAG_INIT;
};


vector<SpinLock> spinLocks(NUM_LOCKS);


void add_to_map(unordered_map<int,int> * mm, const int keyStart, const int keyEnd, const int tid){

    for(int key=keyStart;key<keyEnd;++key){
        auto bkt = mm->bucket(key);

        //lock bucket
        spinLocks[bkt].lock();

        //insert pair
        mm->insert({key,tid});

        //unlock bucket
        spinLocks[bkt].unlock();
    }

}


int main() {

    int Nbefore, Nafter;
    thread *t = new thread[NUM_THREADS];

    //create an unordered map, and reserve enough space to avoid a rehash
    unordered_map<int,int> my_map;
    my_map.reserve(2*NUM_THREADS*N);

    //count number of buckets to make sure that a rehash didn't occur
    Nbefore=my_map.bucket_count();


    // Launch NUM_THREADS threads.  Thread k adds keys k*N through (k+1)*N-1 to the hash table, all with associated value = k.

    for(int threadID=0;threadID<NUM_THREADS;++threadID){
        t[threadID]=thread(add_to_map,&my_map,threadID*N,(threadID+1)*N,threadID);
    }

    // Wait for the threads to finish
    for(int threadID=0;threadID<NUM_THREADS;++threadID){
        t[threadID].join();
    }

    //count number of buckets to make sure that a rehash didn't occur
    Nafter=my_map.bucket_count();


    cout << "Number of buckets before adding elements: " << Nbefore <<endl;
    cout << "Number of buckets after  adding elements: " << Nafter  << " <--- same as above, so rehash didn't occur" <<endl;

    //see if any keys are missing
    for(int key=0;key<NUM_THREADS*N;++key){

        if(!my_map.count(key)){

            cout << "key " << key << " not found!" << endl;

        }
    }

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

当错误地未输入密钥时,程序将退出.示例输出是:

Number of buckets before adding elements: 401
Number of buckets after  adding elements: 401 <--- same as above, so rehash didn't occur
key 0 not found!
key 91 not found!
key 96 not found!
key 97 not found!
key 101 not found!
key 192 not found!
key 193 not found!
key 195 not found!
Run Code Online (Sandbox Code Playgroud)

所以,我的问题是双重的:

  1. 我是如何锁定水桶的?
  2. 如果是这样,是否有更好的方法可以逐桶锁定映射以启用对不同存储桶的并发写入?

最后,我会提到我已经尝试过TBB的concurrent_unordered_map,但是在我的应用程序中它比在串行中做事要慢得多.除了杂散错误之外,使用std :: unordered_map的桶锁定方法表现得更好.

Yak*_*ont 1

容器的元素不是桶,而是元素value_type

修改std容器中的一个元素不会对其他元素产生并发影响。但修改一个存储桶并没有这样的保证。

在存储桶中添加或删除元素是对容器的非操作,该非操作不属于无需同步即可安全使用的const特殊非操作列表。const