mar*_*hon 7 c++ concurrency multithreading atomic
所以我试图创建写入时复制映射,它使用读取端的原子引用计数尝试来锁定.
有些事情不太对劲.我看到一些引用过度增加,有些引用下降,所以某些东西不是真正的原子.在我的测试中,我有10个读取器线程循环100次,每次执行get()和1个写入器线程执行100次写入.
它在作者中陷入困境,因为有些引用永远不会降到零,即使它们应该也是如此.
我正在尝试使用本博客解释的128位DCAS技术.
这有什么明显的错误或是否有更简单的方法来调试它而不是在调试器中使用它?
typedef std::unordered_map<std::string, std::string> StringMap;
static const int zero = 0; //provides an l-value for asm code
class NonBlockingReadMapCAS {
public:
class OctaWordMapWrapper {
public:
StringMap* fStringMap;
//std::atomic<int> fCounter;
int64_t fCounter;
OctaWordMapWrapper(OctaWordMapWrapper* copy) : fStringMap(new StringMap(*copy->fStringMap)), fCounter(0) { }
OctaWordMapWrapper() : fStringMap(new StringMap), fCounter(0) { }
~OctaWordMapWrapper() {
delete fStringMap;
}
/**
* Does a compare and swap on an octa-word - in this case, our two adjacent class members fStringMap
* pointer and fCounter.
*/
static bool inline doubleCAS(OctaWordMapWrapper* target, StringMap* compareMap, int64_t compareCounter, StringMap* swapMap, int64_t swapCounter ) {
bool cas_result;
__asm__ __volatile__
(
"lock cmpxchg16b %0;" // cmpxchg16b sets ZF on success
"setz %3;" // if ZF set, set cas_result to 1
: "+m" (*target),
"+a" (compareMap), //compare target's stringmap pointer to compareMap
"+d" (compareCounter), //compare target's counter to compareCounter
"=q" (cas_result) //results
: "b" (swapMap), //swap target's stringmap pointer with swapMap
"c" (swapCounter) //swap target's counter with swapCounter
: "cc", "memory"
);
return cas_result;
}
OctaWordMapWrapper* atomicIncrementAndGetPointer()
{
if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))
return this;
else
return NULL;
}
OctaWordMapWrapper* atomicDecrement()
{
while(true) {
if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter -1))
break;
}
return this;
}
bool atomicSwapWhenNotReferenced(StringMap* newMap)
{
return doubleCAS(this, this->fStringMap, zero, newMap, 0);
}
}
__attribute__((aligned(16)));
std::atomic<OctaWordMapWrapper*> fReadMapReference;
pthread_mutex_t fMutex;
NonBlockingReadMapCAS() {
fReadMapReference = new OctaWordMapWrapper();
}
~NonBlockingReadMapCAS() {
delete fReadMapReference;
}
bool contains(const char* key) {
std::string keyStr(key);
return contains(keyStr);
}
bool contains(std::string &key) {
OctaWordMapWrapper *map;
do {
map = fReadMapReference.load()->atomicIncrementAndGetPointer();
} while (!map);
bool result = map->fStringMap->count(key) != 0;
map->atomicDecrement();
return result;
}
std::string get(const char* key) {
std::string keyStr(key);
return get(keyStr);
}
std::string get(std::string &key) {
OctaWordMapWrapper *map;
do {
map = fReadMapReference.load()->atomicIncrementAndGetPointer();
} while (!map);
//std::cout << "inc " << map->fStringMap << " cnt " << map->fCounter << "\n";
std::string value = map->fStringMap->at(key);
map->atomicDecrement();
return value;
}
void put(const char* key, const char* value) {
std::string keyStr(key);
std::string valueStr(value);
put(keyStr, valueStr);
}
void put(std::string &key, std::string &value) {
pthread_mutex_lock(&fMutex);
OctaWordMapWrapper *oldWrapper = fReadMapReference;
OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
std::pair<std::string, std::string> kvPair(key, value);
newWrapper->fStringMap->insert(kvPair);
fReadMapReference.store(newWrapper);
std::cout << oldWrapper->fCounter << "\n";
while (oldWrapper->fCounter > 0);
delete oldWrapper;
pthread_mutex_unlock(&fMutex);
}
void clear() {
pthread_mutex_lock(&fMutex);
OctaWordMapWrapper *oldWrapper = fReadMapReference;
OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
fReadMapReference.store(newWrapper);
while (oldWrapper->fCounter > 0);
delete oldWrapper;
pthread_mutex_unlock(&fMutex);
}
};
Run Code Online (Sandbox Code Playgroud)
也许不是答案,但这对我来说看起来很可疑:
while (oldWrapper->fCounter > 0);
delete oldWrapper;
Run Code Online (Sandbox Code Playgroud)
当计数器为 0 时,您可以让读取器线程刚刚进入atomicIncrementAndGetPointer(),从而通过删除包装器将地毯拉到读取器线程下方。
编辑总结以下评论以获取潜在的解决方案:
我知道的最好的实现是fCounter从移动OctaWordMapWrapper到(实际上你根本fReadMapReference不需要这个类)。OctaWordMapWrapper当计数器为零时,交换写入器中的指针。因为读取器线程可能存在高争用,这实际上会无限期地阻塞写入器fCounter,所以您可以为读取器锁分配最高位,即,当设置该位时,读取器旋转直到该位被清除。编写器__sync_fetch_and_or()在要更改指针时设置该位( ),等待计数器降至零(即现有读取器完成其工作),然后交换指针并清除该位。
这种方法应该是防水的,尽管它显然会在写入时阻止读取器。我不知道这在您的情况下是否可以接受,理想情况下您希望它是非阻塞的。
代码看起来像这样(未经测试!):
class NonBlockingReadMapCAS
{
public:
NonBlockingReadMapCAS() :m_ptr(0), m_counter(0) {}
private:
StringMap *acquire_read()
{
while(1)
{
uint32_t counter=atom_inc(m_counter);
if(!(counter&0x80000000))
return m_ptr;
atom_dec(m_counter);
while(m_counter&0x80000000);
}
return 0;
}
void release_read()
{
atom_dec(m_counter);
}
void acquire_write()
{
uint32_t counter=atom_or(m_counter, 0x80000000);
assert(!(counter&0x80000000));
while(m_counter&0x7fffffff);
}
void release_write()
{
atom_and(m_counter, uint32_t(0x7fffffff));
}
StringMap *volatile m_ptr;
volatile uint32_t m_counter;
};
Run Code Online (Sandbox Code Playgroud)
只需在访问指针进行读/写之前和之后调用 acquire/release_read/write() 即可。分别替换atom_inc/dec/or/and()为__sync_fetch_and_add()、__sync_fetch_and_sub()、__sync_fetch_and_or()和__sync_fetch_and_and()。实际上你不需要doubleCAS()这个。
正如@Quuxplusone 在下面的评论中正确指出的那样,这是单个生产者和多个消费者的实现。我修改了代码以正确断言以强制执行此操作。