使用std :: atomic和std :: condition_variable,Sync是不可靠的

Ine*_*ion 11 c++ multithreading stl c++11

在用C++ 11编写的分布式作业系统中,我使用以下结构实现了一个fence(即工作线程池外部的线程可能会要求阻塞,直到完成所有当前计划的作业):

struct fence
{
    std::atomic<size_t>                     counter;
    std::mutex                              resume_mutex;
    std::condition_variable                 resume;

    fence(size_t num_threads)
        : counter(num_threads)
    {}
};
Run Code Online (Sandbox Code Playgroud)

实现fence的代码如下所示:

void task_pool::fence_impl(void *arg)
{
    auto f = (fence *)arg;
    if (--f->counter == 0)      // (1)
        // we have zeroed this fence's counter, wake up everyone that waits
        f->resume.notify_all(); // (2)
    else
    {
        unique_lock<mutex> lock(f->resume_mutex);
        f->resume.wait(lock);   // (3)
    }
}
Run Code Online (Sandbox Code Playgroud)

如果线程在一段时间内进入围栏,这种方法非常有效.然而,如果他们几乎同时尝试这样做,似乎有时会发生在原子递减(1)和开始条件var(3)的等待之间,线程产生CPU时间而另一个线程将计数器递减到零( 1)并解雇cond.var(2).这导致前一个线程在(3)中永远等待,因为它已经被通知后开始等待它.

让事情变得可行的黑客就是在(2)之前进行10毫秒的睡眠,但这显然是不可接受的.

有关如何以高效的方式解决这个问题的任何建议?

Max*_*kin 13

您的诊断是正确的,此代码很容易以您描述的方式丢失条件通知.即在一个线程锁定互斥锁之后但在等待条件变量之前,另一个线程可能会调用notify_all(),以便第一个线程错过该通知.

一个简单的解决方法是在递减计数器之前锁定互斥锁,同时通知:

void task_pool::fence_impl(void *arg)
{
    auto f = static_cast<fence*>(arg);
    std::unique_lock<std::mutex> lock(f->resume_mutex);
    if (--f->counter == 0) {
        f->resume.notify_all();
    }
    else do {
        f->resume.wait(lock);
    } while(f->counter);
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,计数器不必是原子的.

在通知之前锁定互斥锁的额外奖励(或惩罚,取决于观点)是(从这里):

pthread_cond_broadcast()或pthread_cond_signal()函数可以由线程调用,无论它当前是否拥有调用pthread_cond_wait()或pthread_cond_timedwait()的线程在等待期间与条件变量相关联的互斥锁; 但是,如果需要可预测的调度行为,则该互斥锁应由调用pthread_cond_broadcast()或pthread_cond_signal()的线程锁定.

关于while循环(从这里):

可能会发生pthread_cond_timedwait()或pthread_cond_wait()函数的虚假唤醒.由于从pthread_cond_timedwait()或pthread_cond_wait()返回并不意味着有关此谓词的值的任何内容,因此应在返回时重新评估谓词.