alf*_*out 6 c++ performance lock-free dynamic-memory-allocation data-structures
我有一个类存储一些传入的实时数据的最新值(大约1.5亿事件/秒).
假设它看起来像这样:
class DataState
{
Event latest_event;
public:
//pushes event atomically
void push_event(const Event __restrict__* e);
//pulls event atomically
Event pull_event();
};
Run Code Online (Sandbox Code Playgroud)
我需要能够以原子方式推送事件并使用严格的排序保证来拉动它们.现在,我知道我可以使用自旋锁,但鉴于大量事件率(超过1亿/秒)和高度并发性,我更喜欢使用无锁操作.
问题是Event大小为64字节.CMPXCHG64B目前的X86 CPU 没有任何指令(截至2016年8月).因此,如果我使用std::atomic<Event>我必须链接到libatomic使用互联网下的互斥量(太慢).
所以我的解决方案是以原子方式交换指向值的指针.问题是动态内存分配成为这些事件率的瓶颈.所以...我定义了一些我称之为"环分配器"的东西:
/// @brief Lockfree Static short-lived allocator used for a ringbuffer
/// Elements are guaranteed to persist only for "size" calls to get_next()
template<typename T> class RingAllocator {
T *arena;
std::atomic_size_t arena_idx;
const std::size_t arena_size;
public:
/// @brief Creates a new RingAllocator
/// @param size The number of elements in the underlying arena. Make this large enough to avoid overwriting fresh data
RingAllocator<T>(std::size_t size) : arena_size(size)
{
//allocate pool
arena = new T[size];
//zero out pool
std::memset(arena, 0, sizeof(T) * size);
arena_idx = 0;
}
~RingAllocator()
{
delete[] arena;
}
/// @brief Return next element's pointer. Thread-safe
/// @return pointer to next available element
T *get_next()
{
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
}
};
Run Code Online (Sandbox Code Playgroud)
然后我可以让我的DataState类看起来像这样:
class DataState
{
std::atomic<Event*> latest_event;
RingAllocator<Event> event_allocator;
public:
//pushes event atomically
void push_event(const Event __restrict__* e)
{
//store event
Event *new_ptr = event_allocator.get_next()
*new_ptr = *e;
//swap event pointers
latest_event.store(new_ptr, std::memory_order_release);
}
//pulls event atomically
Event pull_event()
{
return *(latest_event.load(std::memory_order_acquire));
}
};
Run Code Online (Sandbox Code Playgroud)
只要我将环分配器的大小调整为可以同时调用函数的最大线程数,就不存在覆盖pull_event可能返回的数据的风险.加上一切都是超级本地化的,因此间接不会导致错误的缓存性能.这种方法有任何可能的陷阱吗?
DataState:我以为它会是一个堆栈或队列,但事实并非如此,所以push/pull似乎不是一个好的方法名称。(否则实现完全是假的)。
它只是一个锁存器,可让您读取任何线程存储的最后一个事件。
没有什么可以阻止连续两次写入覆盖从未读取过的元素。也没有什么可以阻止您两次阅读同一元素。
如果您只需要在某个地方复制小数据块,那么环形缓冲区似乎是一个不错的方法。但如果你不想失去事件,我认为你不能这样使用它。相反,只需获取一个环形缓冲区条目,然后复制到其中并在那里使用它。因此,唯一的原子操作应该是递增环形缓冲区位置索引。
你可以变得get_next()更有效率。此行执行原子后递增 (fetch_add) 和原子交换:
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
Run Code Online (Sandbox Code Playgroud)
我什至不确定它是否安全,因为 xchg 可能会从另一个线程踩到 fetch_add 。无论如何,即使安全,也并不理想。
你不需要那个。确保 arena_size 始终是 2 的幂,然后就不需要对共享计数器取模。您可以放开它,让每个线程对其取模以供自己使用。它最终会换行,但它是一个二进制整数,因此它将以 2 的幂换行,这是竞技场大小的倍数。
我建议存储 AND 掩码而不是大小,这样就不存在编译%为指令以外的任何内容的风险and,即使它不是编译时常量。这可以确保我们避免使用 64 位整数div指令。
template<typename T> class RingAllocator {
T *arena;
std::atomic_size_t arena_idx;
const std::size_t size_mask; // maybe even make this a template parameter?
public:
RingAllocator<T>(std::size_t size)
: arena_idx(0), size_mask(size-1)
{
// verify that size is actually a power of two, so the mask is all-ones in the low bits, and all-zeros in the high bits.
// so that i % size == i & size_mask for all i
...
}
...
T *get_next() {
size_t idx = arena_idx.fetch_add(1, std::memory_order_relaxed); // still atomic, but we don't care which order different threads take blocks in
idx &= size_mask; // modulo our local copy of the idx
return &arena[idx];
}
};
Run Code Online (Sandbox Code Playgroud)
calloc如果使用new + memset 而不是,分配 arena 会更有效。操作系统在将页面提供给用户空间进程之前已经将其清零(以防止信息泄漏),因此将它们全部写入只是浪费工作。
arena = new T[size];
std::memset(arena, 0, sizeof(T) * size);
// vs.
arena = (T*)calloc(size, sizeof(T));
Run Code Online (Sandbox Code Playgroud)
自己编写页面确实会导致它们出错,因此它们都连接到真实的物理页面,而不仅仅是系统范围共享物理零页面的写时复制映射(就像它们在 new/malloc/calloc 之后)。在 NUMA 系统上,选择的物理页面可能取决于哪个线程实际接触该页面,而不是哪个线程进行分配。但由于您要重用该池,因此第一个写入页面的核心可能不是最终使用它最多的核心。
也许需要在微基准/性能计数器中寻找一些东西。
| 归档时间: |
|
| 查看次数: |
431 次 |
| 最近记录: |