想象一下有两个线程的程序.它们运行以下代码(CAS指比较和交换):
// Visible to both threads
static int test;
// Run by thread A
void foo()
{
// Check if value is 'test' and swap in 0xdeadbeef
while(!CAS(&test, test, 0xdeadbeef)) {}
}
// Run by thread B
void bar()
{
while(1) {
// Perpetually atomically write rand() into the test variable
atomic_write(&test, rand());
}
}
Run Code Online (Sandbox Code Playgroud)
线程B是否有可能永久地导致线程A的CAS失败,从而永远不会将0xdeadbeef写入'test'?或者自然调度抖动是否意味着在实践中这种情况永远不会发生?如果在线程A的while循环中完成了一些工作怎么办?
我正在寻找符合这些要求的无锁设计:
我目前已经实现了一个包含这些结构的多个实例的ringbuffer; 但是这个实现受到以下事实的影响:当编写器使用了环形缓冲区中存在的所有结构时,没有更多的地方可以从结构中改变......但是,其余的环形缓冲区包含一些不必读取的数据由读者但不能被作者重复使用.因此,环形缓冲器不适合这个目的.
无锁设计的任何想法(名称或伪实现)?谢谢你考虑过这个问题.
我们一直希望在代码中使用无锁队列来减少当前实现中单个生产者和使用者之间的锁争用.那里有很多队列实现,但我还不太清楚如何最好地管理节点的内存管理.
例如,生产者看起来像这样:
queue.Add( new WorkUnit(...) );
Run Code Online (Sandbox Code Playgroud)
消费者看起来像:
WorkUnit* unit = queue.RemoveFront();
unit->Execute();
delete unit;
Run Code Online (Sandbox Code Playgroud)
我们目前使用内存池进行分配.您会注意到生产者分配内存并且消费者删除它.由于我们正在使用池,因此我们需要向内存池添加另一个锁以正确保护它.这似乎首先否定了无锁队列的性能优势.
到目前为止,我认为我们的选择是:
还有其他我们可以探索的选择吗?我们试图避免实现无锁内存池,但我们可能采取这种方式.
谢谢.
我在下面有两个版本的自旋锁.第一个使用默认值memory_order_cst,而后者使用memory_order_acquire/memory_order_release.由于后者更放松,我希望它有更好的表现.但似乎并非如此.
class SimpleSpinLock
{
public:
inline SimpleSpinLock(): mFlag(ATOMIC_FLAG_INIT) {}
inline void lock()
{
int backoff = 0;
while (mFlag.test_and_set()) { DoWaitBackoff(backoff); }
}
inline void unlock()
{
mFlag.clear();
}
private:
std::atomic_flag mFlag = ATOMIC_FLAG_INIT;
};
class SimpleSpinLock2
{
public:
inline SimpleSpinLock2(): mFlag(ATOMIC_FLAG_INIT) {}
inline void lock()
{
int backoff = 0;
while (mFlag.test_and_set(std::memory_order_acquire)) { DoWaitBackoff(backoff); }
}
inline void unlock()
{
mFlag.clear(std::memory_order_release);
}
private:
std::atomic_flag mFlag = ATOMIC_FLAG_INIT;
};
const int NUM_THREADS = 8;
const int NUM_ITERS = 5000000;
const …Run Code Online (Sandbox Code Playgroud) 我需要一个非常快(在某种意义上"读取器的低成本",而不是"低延迟")线程之间的更改通知机制,以便更新读取缓存:
情况
线程W(Writer)S仅在一段时间内更新数据结构()(在我的情况下是地图中的设置).
线程R(Reader)维护缓存S并且经常读取它.当线程W更新S线程R需要在合理的时间(10-100ms)内通知更新.
架构是ARM,x86和x86_64.我需要支持C++03gcc 4.6及更高版本.
码
是这样的:
// variables shared between threads
bool updateAvailable;
SomeMutex dataMutex;
std::string myData;
// variables used only in Thread R
std::string myDataCache;
// Thread W
SomeMutex.Lock();
myData = "newData";
updateAvailable = true;
SomeMutex.Unlock();
// Thread R
if(updateAvailable)
{
SomeMutex.Lock();
myDataCache = myData;
updateAvailable = false;
SomeMutex.Unlock();
}
doSomethingWith(myDataCache);
Run Code Online (Sandbox Code Playgroud)
我的问题
在线程R中,"快速路径"中没有发生锁定或障碍(无可用更新).这是一个错误吗?这个设计有什么后果?
我需要有资格updateAvailable的volatile?
最终 …
我一直在阅读内存障碍:软件黑客的硬件视图,这是Paul E. McKenney的一篇非常受欢迎的文章.
本文强调的一点是,像Alpha这样非常微弱有序的处理器可以重新排序依赖负载,这似乎是分区缓存的副作用
论文摘录:
1 struct el *insert(long key, long data)
2 {
3 struct el *p;
4 p = kmalloc(sizeof(*p), GPF_ATOMIC);
5 spin_lock(&mutex);
6 p->next = head.next;
7 p->key = key;
8 p->data = data;
9 smp_wmb();
10 head.next = p;
11 spin_unlock(&mutex);
12 }
13
14 struct el *search(long key)
15 {
16 struct el *p;
17 p = head.next;
18 while (p != &head) {
19 /* BUG ON ALPHA!!! */
20 if (p->key …Run Code Online (Sandbox Code Playgroud) synchronization locking cpu-architecture lock-free memory-barriers
我有一个类存储一些传入的实时数据的最新值(大约1.5亿事件/秒).
假设它看起来像这样:
class DataState
{
Event latest_event;
public:
//pushes event atomically
void push_event(const Event __restrict__* e);
//pulls event atomically
Event pull_event();
};
Run Code Online (Sandbox Code Playgroud)
我需要能够以原子方式推送事件并使用严格的排序保证来拉动它们.现在,我知道我可以使用自旋锁,但鉴于大量事件率(超过1亿/秒)和高度并发性,我更喜欢使用无锁操作.
问题是Event大小为64字节.CMPXCHG64B目前的X86 CPU 没有任何指令(截至2016年8月).因此,如果我使用std::atomic<Event>我必须链接到libatomic使用互联网下的互斥量(太慢).
所以我的解决方案是以原子方式交换指向值的指针.问题是动态内存分配成为这些事件率的瓶颈.所以...我定义了一些我称之为"环分配器"的东西:
/// @brief Lockfree Static short-lived allocator used for a ringbuffer
/// Elements are guaranteed to persist only for "size" calls to get_next()
template<typename T> class RingAllocator {
T *arena;
std::atomic_size_t arena_idx;
const std::size_t arena_size;
public:
/// @brief Creates a new RingAllocator
/// @param size The …Run Code Online (Sandbox Code Playgroud) c++ performance lock-free dynamic-memory-allocation data-structures
我在内存中遇到了内存回收的问题crossbeam.假设您正在实现一个包含单个值的简单线程安全无锁容器.任何线程都可以获取存储值的克隆,并且值可以在任何时候更新,之后读者开始观察新值的克隆.
虽然典型的用例是指定像Arc<X>T 这样的东西,但实现不能依赖于T指针大小 - 例如,X可能是一个特征,导致胖指针Arc<X>.但是对任意T的无锁访问似乎非常适合基于纪元的无锁代码.根据这些例子,我想出了这个:
extern crate crossbeam;
use std::thread;
use std::sync::atomic::Ordering;
use crossbeam::epoch::{self, Atomic, Owned};
struct Container<T: Clone> {
current: Atomic<T>,
}
impl<T: Clone> Container<T> {
fn new(initial: T) -> Container<T> {
Container { current: Atomic::new(initial) }
}
fn set_current(&self, new: T) {
let guard = epoch::pin();
let prev = self.current.swap(Some(Owned::new(new)),
Ordering::AcqRel, &guard);
if let Some(prev) = prev {
unsafe {
// once swap has propagated, *PREV will no …Run Code Online (Sandbox Code Playgroud) 由于std::atomic::is_lock_free()可能没有真正反映现实[ 参考 ],我正在考虑编写一个真正的运行时测试.然而,当我开始研究它时,我发现这不是一个我认为不可能完成的微不足道的任务.我想知道是否有一些聪明的想法可以做到这一点.
当我无法弄清楚为什么需要特定的内存屏障时,我一直在本网站上查看无锁的单生产者/单消费者循环缓冲区。我已经仔细阅读了数百次关于内存顺序的标准规则,但我不明白我错过了什么。
通过这种实现,只有一个可以调用该push()函数的唯一线程和另一个可以调用该函数的唯一线程pop()。
这是Producer代码:
bool push(const Element& item)
{
const auto current_tail = _tail.load(std::memory_order_relaxed); //(1)
const auto next_tail = increment(current_tail);
if(next_tail != _head.load(std::memory_order_acquire)) //(2)
{
_array[current_tail] = item; //(3)
_tail.store(next_tail, std::memory_order_release); //(4)
return true;
}
return false; // full queue
}
Run Code Online (Sandbox Code Playgroud)
这是Consumer代码:
bool pop(Element& item)
{
const auto current_head = _head.load(std::memory_order_relaxed); //(1)
if(current_head == _tail.load(std::memory_order_acquire)) //(2)
return false; // empty queue
item = _array[current_head]; //(3)
_head.store(increment(current_head), std::memory_order_release); //(4)
return …Run Code Online (Sandbox Code Playgroud) lock-free ×10
c++ ×7
atomic ×3
c++11 ×3
performance ×2
c++03 ×1
concurrency ×1
locking ×1
real-time ×1
rust ×1