使用 alignas 防止错误共享被破坏

Mat*_*iri 6 c++ multithreading x86-64 memory-alignment false-sharing

我不习惯在互联网上发布任何问题,所以如果我做错了什么,请告诉我。

简而言之

  1. 如何在 CPU 缓存行大小为 64 字节的 64 位架构上正确防止错误共享?

  2. C++ 'alignas' 关键字和简单字节数组(例如:char[64])的使用如何影响多线程效率?

语境

在研究Single Consumer Single Producer Queue的非常有效的实现时,我在对我的代码进行基准测试时遇到了 GCC 编译器的不合逻辑行为。

全文

我希望有人有必要的知识来解释正在发生的事情。

我目前在 arch linux 上使用 GCC 10.2.0 及其 C++ 20 实现。我的笔记本电脑是带有 i7-7500U 处理器的联想 T470S。

让我从数据结构开始:

class SPSCQueue
{
public:
    ...

private:
    alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    char _pad0[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding

    alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer2
    std::size_t _tailCache { 0 }; // Head cache for the consumer
    char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding
};
Run Code Online (Sandbox Code Playgroud)

以下数据结构在我的系统上推送/弹出时获得了快速而稳定的 20ns。

但是,使用以下成员更改对齐方式会使基准测试不稳定并在 20 到 30ns 之间。

    alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    struct alignas(64) {
        Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
        std::size_t _headCache { 0 }; // Head cache for the producer
    };

    alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    struct alignas(64) {
        Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer1
        std::size_t _tailCache { 0 }; // Tail cache for the consumer
    };
Run Code Online (Sandbox Code Playgroud)

最后,当我尝试这个配置给我 40 到 55ns 的结果时,我迷失了更多。


    std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    char _pad0[64 - sizeof(std::atomic<size_t>)];
    Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)];

    std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    char _pad2[64 - sizeof(std::atomic<size_t>)];
    Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer2
    std::size_t _tailCache { 0 }; // Head cache for the consumer
    char _pad3[64 - sizeof(Buffer) - sizeof(std::size_t)];
Run Code Online (Sandbox Code Playgroud)

这次我让队列推送/弹出在 40 到 55ns 之间振荡。

在这一点上我很迷茫,因为我不知道我应该去哪里寻找答案。到目前为止,C++ 内存布局对我来说非常直观,但我意识到我仍然错过了在高频多线程方面做得更好的非常重要的知识。

最少的代码示例

如果你想编译整个代码来自己测试,这里需要几个文件:

SPSCQueue.hpp:


#pragma once

#include <atomic>
#include <cstdlib>
#include <cinttypes>

#define KF_ALIGN_CACHELINE alignas(kF::Core::Utils::CacheLineSize)

namespace kF::Core
{
    template<typename Type>
    class SPSCQueue;

    namespace Utils
    {
        /** @brief Helper used to perfect forward move / copy constructor */
        template<typename Type, bool ForceCopy = false>
        void ForwardConstruct(Type *dest, Type *source) {
            if constexpr (!ForceCopy && std::is_move_assignable_v<Type>)
                new (dest) Type(std::move(*source));
            else
                new (dest) Type(*source);
        }

        /** @brief Helper used to perfect forward move / copy assignment */
        template<typename Type, bool ForceCopy = false>
        void ForwardAssign(Type *dest, Type *source) {
            if constexpr (!ForceCopy && std::is_move_assignable_v<Type>)
                *dest = std::move(*source);
            else
                *dest = *source;
        }

        /** @brief Theorical cacheline size */
        constexpr std::size_t CacheLineSize = 64ul;
    }
}

/**
 * @brief The SPSC queue is a lock-free queue that only supports a Single Producer and a Single Consumer
 * The queue is really fast compared to other more flexible implementations because the fact that only two thread can simultaneously read / write
 * means that less synchronization is needed for each operation.
 * The queue supports ranged push / pop to insert multiple elements without performance impact
 *
 * @tparam Type to be inserted
 */
template<typename Type>
class kF::Core::SPSCQueue
{
public:
    /** @brief Buffer structure containing all cells */
    struct Buffer
    {
        Type *data { nullptr };
        std::size_t capacity { 0 };
    };

    /** @brief Local thread cache */
    struct Cache
    {
        Buffer buffer {};
        std::size_t value { 0 };
    };

    /** @brief Default constructor initialize the queue */
    SPSCQueue(const std::size_t capacity);

    /** @brief Destruct and release all memory (unsafe) */
    ~SPSCQueue(void) { clear(); std::free(_buffer.data); }

    /** @brief Push a single element into the queue
     *  @return true if the element has been inserted */
    template<typename ...Args>
    [[nodiscard]] inline bool push(Args &&...args);

    /** @brief Pop a single element from the queue
     *  @return true if an element has been extracted */
    [[nodiscard]] inline bool pop(Type &value);

    /** @brief Clear all elements of the queue (unsafe) */
    void clear(void);

private:
    KF_ALIGN_CACHELINE std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    struct {
        Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
        std::size_t _headCache { 0 }; // Head cache for the producer
        char _pad0[Utils::CacheLineSize - sizeof(Buffer) - sizeof(std::size_t)];
    };

    KF_ALIGN_CACHELINE std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    struct{
        Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer2
        std::size_t _tailCache { 0 }; // Head cache for the consumer
        char _pad1[Utils::CacheLineSize - sizeof(Buffer) - sizeof(std::size_t)];
    };

    /** @brief Copy and move constructors disabled */
    SPSCQueue(const SPSCQueue &other) = delete;
    SPSCQueue(SPSCQueue &&other) = delete;
};

static_assert(sizeof(kF::Core::SPSCQueue<int>) == 4 * kF::Core::Utils::CacheLineSize);

template<typename Type>
kF::Core::SPSCQueue<Type>::SPSCQueue(const std::size_t capacity)
{
    _buffer.capacity = capacity;
    if (_buffer.data = reinterpret_cast<Type *>(std::malloc(sizeof(Type) * capacity)); !_buffer.data)
        throw std::runtime_error("Core::SPSCQueue: Malloc failed");
    _buffer2 = _buffer;
}

template<typename Type>
template<typename ...Args>
bool kF::Core::SPSCQueue<Type>::push(Args &&...args)
{
    static_assert(std::is_constructible<Type, Args...>::value, "Type must be constructible from Args...");

    const auto tail = _tail.load(std::memory_order_relaxed);
    auto next = tail + 1;

    if (next == _buffer.capacity) [[unlikely]]
        next = 0;
    if (auto head = _headCache; next == head) [[unlikely]] {
        head = _headCache = _head.load(std::memory_order_acquire);
        if (next == head) [[unlikely]]
            return false;
    }
    new (_buffer.data + tail) Type{ std::forward<Args>(args)... };
    _tail.store(next, std::memory_order_release);
    return true;
}

template<typename Type>
bool kF::Core::SPSCQueue<Type>::pop(Type &value)
{
    const auto head = _head.load(std::memory_order_relaxed);

    if (auto tail = _tailCache; head == tail) [[unlikely]] {
        tail = _tailCache = _tail.load(std::memory_order_acquire);
        if (head == tail) [[unlikely]]
            return false;
    }
    auto *elem = reinterpret_cast<Type *>(_buffer2.data + head);
    auto next = head + 1;
    if (next == _buffer2.capacity) [[unlikely]]
        next = 0;
    value = std::move(*elem);
    elem->~Type();
    _head.store(next, std::memory_order_release);
    return true;
}

template<typename Type>
void kF::Core::SPSCQueue<Type>::clear(void)
{
    for (Type type; pop(type););
}
Run Code Online (Sandbox Code Playgroud)

基准测试,使用谷歌基准测试。bench_SPSCQueue.cpp:

#include <thread>

#include <benchmark/benchmark.h>

#include "SPSCQueue.hpp"

using namespace kF;

using Queue = Core::SPSCQueue<std::size_t>;

constexpr std::size_t Capacity = 4096;

static void SPSCQueue_NoisyPush(benchmark::State &state)
{
    Queue queue(Capacity);
    std::atomic<bool> running = true;
    std::size_t i = 0ul;
    std::thread thd([&queue, &running] { for (std::size_t tmp; running; benchmark::DoNotOptimize(queue.pop(tmp))); });
    for (auto _ : state) {
        decltype(std::chrono::high_resolution_clock::now()) start;
        do {
            start = std::chrono::high_resolution_clock::now();
        } while (!queue.push(42ul));
        auto end = std::chrono::high_resolution_clock::now();
        auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
        auto iterationTime = elapsed.count();
        state.SetIterationTime(iterationTime);
    }
    running = false;
    if (thd.joinable())
        thd.join();
}
BENCHMARK(SPSCQueue_NoisyPush)->UseManualTime();

static void SPSCQueue_NoisyPop(benchmark::State &state)
{
    Queue queue(Capacity);
    std::atomic<bool> running = true;
    std::size_t i = 0ul;
    std::thread thd([&queue, &running] { while (running) benchmark::DoNotOptimize(queue.push(42ul)); });
    for (auto _ : state) {
        std::size_t tmp;
        decltype(std::chrono::high_resolution_clock::now()) start;
        do {
            start = std::chrono::high_resolution_clock::now();
        } while (!queue.pop(tmp));
        auto end = std::chrono::high_resolution_clock::now();
        auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
        auto iterationTime = elapsed.count();
        state.SetIterationTime(iterationTime);
    }
    running = false;
    if (thd.joinable())
        thd.join();
}
BENCHMARK(SPSCQueue_NoisyPop)->UseManualTime();

Run Code Online (Sandbox Code Playgroud)

Mat*_*iri 1

感谢您有用的评论(主要是感谢 Peter Cordes),问题似乎来自 L2 数据预取器。

由于我的 SPSC 队列设计,每个线程必须访问两个连续的缓存行才能推送/弹出队列。如果结构本身未对齐到 128 字节,则其地址将不会对齐到 128 字节,编译器将无法优化两个对齐的高速缓存行的访问。

因此,简单的修复方法是:

template<typename Type>
class alignas(128) SPSCQueue { ... };
Run Code Online (Sandbox Code Playgroud)

这里(第 2.5.5.4 节数据预取)是 Intel 的一篇有趣的论文,解释了其架构的优化以及如何在不同级别的缓存中完成预取。