Ove*_*ade 7 c++ atomic wait stdatomic c++20
考虑以下示例代码,其中线程 A 将函数推送到队列中,线程 B 在从队列中弹出时执行这些函数:
std::atomic<uint32_t> itemCount;
//Executed by thread A
void run(std::function<void()> function) {
    if (queue.push(std::move(function))) {
        itemCount.fetch_add(1, std::memory_order_acq_rel);
        itemCount.notify_one();
    }
}
//Executed by thread B
void threadMain(){
    std::function<void()> function;
    while(true){
        if (queue.pop(function)) {
            itemCount.fetch_sub(1, std::memory_order_acq_rel);
            function();
        }else{
            itemCount.wait(0, std::memory_order_acquire);
        }
    }
}
其中queue是一个并发队列,它有一个push和一个pop函数,每个函数返回一个bool指示给定操作是否成功。所以如果满了就push返回,如果空就返回。falsepopfalse
现在我想知道代码是否在所有情况下都是线程安全的。假设线程 Bpop失败并且即将调用std::atomic<T>::wait. 同时,线程 A 推送一个新元素,而线程 B 检查初始等待条件。由于itemCount尚未更改,因此失败。
紧接着,线程 A 增加计数器并尝试通知一个正在等待的线程(尽管线程 B 尚未在内部等待)。线程 B 最终等待原子,导致线程由于丢失信号而永远不会再次醒来,尽管队列中有一个元素。只有当新元素被推入队列时才会停止,通知 B 继续执行。
我无法手动重现这种情况,因为时间几乎不可能正确。
这是一个严重的问题还是不可能发生?为了解决这种罕见的情况,确实存在哪些(最好是原子的)替代方案?
编辑:顺便提一下,队列不是阻塞的,仅利用原子操作。
我问的原因是我不明白如何实现原子wait操作。尽管标准说整个操作是原子的(由加载+谓词检查+等待组成),但在我使用的实现中,std::atomic<T>::wait大致实现如下:
void wait(const _TVal _Expected, const memory_order _Order = memory_order_seq_cst) const noexcept {
    _Atomic_wait_direct(this, _Atomic_reinterpret_as<long>(_Expected), _Order);
}
其中_Atomic_wait_direct定义为
template <class _Ty, class _Value_type>
void _Atomic_wait_direct(
    const _Atomic_storage<_Ty>* const _This, _Value_type _Expected_bytes, const memory_order _Order) noexcept {
    const auto _Storage_ptr = _STD addressof(_This->_Storage);
    for (;;) {
        const _Value_type _Observed_bytes = _Atomic_reinterpret_as<_Value_type>(_This->load(_Order));
        if (_Expected_bytes != _Observed_bytes) {
            return;
        }
        __std_atomic_wait_direct(_Storage_ptr, &_Expected_bytes, sizeof(_Value_type), _Atomic_wait_no_timeout);
    }
}
我们可以清楚地看到有一个具有指定内存顺序的原子加载来检查原子本身的状态。但是,我不明白如何将整个操作视为原子操作,因为在调用__std_atomic_wait_direct.
对于条件变量,谓词本身由互斥体保护,但是原子本身如何在这里受到保护?
该标准的内容如下:
\n\n\n[intro.races]/4对特定原子对象的所有修改都
\nM以某种特定的总顺序发生,称为 的修改顺序M。
\n\n[atomics.wait]/4对原子对象上的原子等待操作的调用
\nM可以通过调用原子通知操作来解除阻塞,M如果存在副作用X,Y则M:
\n(4.1) \xe2\ x80\x94 原子等待操作在观察 的结果后已被阻塞X,
\n(4.2) \xe2\x80\x94在 的修改顺序中X领先,并且\n(4.3) \xe2\x80\x94发生在调用之前原子通知操作。YMY
您假设以下场景:
\nitemCount为零,无论是来自原始初始化还是先前的fetch_sub。wait加载itemCount并观察值 0。fetch_add和notify_onewait发生阻塞,因为它相信现在已经过时的值 0。在这种情况下,isM是使值变为 0 的旧值(我们假设它发生很久以前并且对所有线程都正确可见),是将值更改为 1 的旧值。itemCountXfetch_subYfetch_add
该标准规定,该wait调用(跨越步骤 2 和 4)实际上可以通过notify_one步骤 3 中的调用来解锁。确实:
(4.1) -wait观察后已阻塞itemCount == 0(如果没有,则问题不会出现)。
\n(4.2) -fetch_sub位于fetch_add修改顺序之前itemCount(假设fetch_sub很久以前发生)。
\n(4.3) -fetch_add发生在之前(事实上,是在之前排序的)notify_one;它们被同一个线程依次调用。
因此,符合要求的实现必须要么wait一开始就不允许阻塞,要么已经notify_one唤醒它;它不能允许错过通知。
本次讨论中唯一涉及内存顺序的地方是 (4.1),“原子等待操作在观察结果后已被阻塞X”。也许fetch_add实际上发生在之前wait(按挂钟),但对 不可见wait,所以无论如何它都会阻塞。但这与结果无关——要么wait观察结果fetch_add并且根本不阻塞;或者它观察旧的结果fetch_sub并阻塞,但notfiy_one需要唤醒它。