std::condition_variable::notify_all() 仅唤醒线程池中的一个线程

Tin*_*lpe 1 c++ std condition-variable threadpool

我正在尝试编写一个非常简单的线程池来了解它们在幕后是如何工作的。不过,我遇到了问题。当我使用condition_variable并调用notify_all()时,它只会唤醒池中的一个线程。

其他一切都很好。我已经排队了 900 个工作,每个工作都有不错的负载。唤醒的一个线程会消耗所有这些作业,然后返回睡眠状态。在下一个循环中,这一切都会再次发生。

问题是只有一个线程完成这项工作!我怎么搞砸了这个模板?

线程池.h:

#pragma once

#include <mutex>
#include <stack>
#include <atomic>
#include <thread>
#include <condition_variable>

class ThreadPool
{
friend void __stdcall ThreadFunc();

public:
    static ThreadPool & GetInstance()
    {
        static ThreadPool sInstance;

        return (sInstance);
    }

public:
    void AddJob(Job * job);
    void DoAllJobs();

private:
    Job * GetJob();

private:
    const static uint32_t ThreadCount = 8;

    std::mutex                  JobMutex;
    std::stack<Job *>           Jobs;
    volatile std::atomic<int>   JobWorkCounter;
    std::mutex                  SharedLock;
    std::thread                 Threads[ThreadCount];
    std::condition_variable     Signal;

private:
    ThreadPool();
    ~ThreadPool();

public:
    ThreadPool(ThreadPool const &) = delete;
    void operator = (ThreadPool const &) = delete;
};
Run Code Online (Sandbox Code Playgroud)

线程池.cpp:

#include "ThreadPool.h"

void __stdcall ThreadFunc()
{
    std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);

    while (true)
    {
        ThreadPool::GetInstance().Signal.wait(lock);

        while (Job * job = ThreadPool::GetInstance().GetJob())
        {
            job->_jobFn(job->_args);
            ThreadPool::GetInstance().JobWorkCounter--;
        }
    }
}

ThreadPool::ThreadPool()
{
    JobWorkCounter = 0;

    for (uint32_t i = 0; i < ThreadCount; ++i)
        Threads[i] = std::thread(ThreadFunc);
}

ThreadPool::~ThreadPool()
{
}

void ThreadPool::AddJob(Job * job)
{
    JobWorkCounter++;

    JobMutex.lock();
    {
        Jobs.push(job);
    }
    JobMutex.unlock();
}

void ThreadPool::DoAllJobs()
{
    Signal.notify_all();

    while (JobWorkCounter > 0)
    {
        Sleep(0);
    }
}

Job * ThreadPool::GetJob()
{
    Job * return_value = nullptr;

    JobMutex.lock();
    {
        if (Jobs.empty() == false)
        {
            return_value = Jobs.top();
            Jobs.pop();
        }
    }
    JobMutex.unlock();

    return (return_value);
}
Run Code Online (Sandbox Code Playgroud)

谢谢你的帮助!抱歉发布了这么大的代码。

Yak*_*ont 5

除非您想设计新模式,否则使用条件变量的简单“猴子看猴子做”方式始终是三件事。

条件变量、互斥锁和消息。

std::condition_variable cv;
mutable std::mutex m;
your_message_type message;
Run Code Online (Sandbox Code Playgroud)

然后有 3 种模式可供遵循。发送一条消息:

std::unique_lock l{m}; // C++17, don't need to pass type
set_message_data(message);
cv.notify_one();
Run Code Online (Sandbox Code Playgroud)

发送大量消息:

std::unique_lock l{m};
set_lots_of_message_data(message);
cv.notify_all();
Run Code Online (Sandbox Code Playgroud)

最后,等待并处理消息:

while(true) {
  auto data = [&]()->std::optional<data_to_process>{
    std::unique_lock l{m};
    cv.wait( l, [&]{ return done() || there_is_a_message(message); } );
    if (done()) return {};
    return get_data_to_process(message);
  }();
  if (!data) break;
  auto& data_to_process = *data;
  // process the data
}
Run Code Online (Sandbox Code Playgroud)

有一定的灵活性。但有许多规则需要遵循。

  1. 在设置消息数据和通知之间,您必须锁定互斥锁。

  2. 您应该始终使用 lambda 版本wait——如果不使用 lambda 版本,则意味着您 100 次中有 99 次都做错了。

  3. 如果没有烦人的线程和锁之类的东西,消息数据应该足以确定是否应该完成任务。

  4. 仅使用 RAII 方式来锁定/解锁互斥体。没有它,正确性几乎是不可能的。

  5. 处理东西时不要持有锁。保持锁定足够长的时间以使数据得到处理,然后释放锁定。

你的代码违反了2、3、4、5。我认为你没有搞砸1。

然而,如果您在通知时保持对 cv 的锁定,现代 cv 实现实际上非常高效。

我认为最明显的症状来自 3:你的工作线程总是持有一把锁,所以只有一个可以前进。其他问题会导致您的代码出现其他问题。


现在,超越这种相对简单的模式是可能的。但是一旦你这样做了,你确实需要至少对 C++ 线程模型有一个基本的了解,并且你不能通过编写代码来学习并“看看它是否有效”。您必须坐下来阅读规范,仔细阅读它们,了解条件变量在标准中的作用,了解互斥体的作用,编写一些代码,坐下来找出它不起作用的原因,找到其他编写类似代码的人代码有问题,弄清楚其他人是如何调试它并发现错误的,返回到你的代码并找到相同的错误,调整它,然后重复。

这就是我使用条件变量编写原语的原因,我不将条件变量与其他逻辑混合(例如,维护线程池)。

编写一个线程安全的队列。它所做的只是维护一个队列并在有数据要读取时通知消费者。

最简单的一个有 3 个成员变量——一个互斥锁、一个条件变量和一个 std 队列。

然后用关闭功能来增强它——现在 pop 必须返回一个可选的或者有一些其他的失败路径。

您的任务需要在全部执行之前对任务进行批处理。你确定你想要那个吗?为此,我要做的就是向线程安全队列添加“推送多个任务”接口。然后将“未准备好”的任务维护在非线程安全队列中,只有当我们希望线程消耗它们时才将它们全部推送。

然后“线程池”消耗线程安全队列。因为我们单独编写了线程安全队列,所以我们的移动部件数量减少了一半,这意味着关系减少了 4 倍。

线程化代码很困难。尊重它。