线程在 C++ 中完成后将新任务分配给线程

Mut*_*thm 4 c++ concurrency multithreading stdthread

我在 C++ 中有以下代码。代码来自C++ Concurrency In Action: Practical Multithreading

void do_work(unsigned id);

void f() {
    std::vector<std::thread> threads;
    for(unsigned i = 0; i < 20; ++i) {
        threads.push_back(std::thread(do_work, i));
    }
    std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
}
Run Code Online (Sandbox Code Playgroud)

假设threads[0]已经完成处理并返回一个值。我还有更多文件要处理,现在想将此新文件分配给一个已完成的线程。如何在 C++ 中实现这种行为?或者我必须销毁线程,然后在线程完成后创建一个新线程?但是,我如何检查这些线程中的任何一个是否已完成?

Jts*_*Jts 6

下面是 Sam Varshavchik 解释的基本实现。

现场演示

我添加 a 的原因local_queue是为了确保我们立即m_Mutex解锁。如果删除它,调用的线程push_task可能会阻塞。

析构函数调用stop()which 设置m_Runningfalse,通知线程有关它,并等待它完成处理所有剩余任务。

如果工人类死了,线程也会死,这很好。

我的示例只创建 3 个线程和每个线程 5 个任务for (int i = 0; i < 5; i++),主要是为了确保所有输出都显示在ideone 中,但我已经用 10 个线程和每个线程 5000 个任务对其进行了测试,并且运行良好。

do_work函数有两行,如果您希望输出流正确同步,您可以取消注释。此类具有多线程支持。

您可以根据需要多次stop()重复start()该线程

class Worker
{
public:
    Worker(bool start) : m_Running(start) { if (start) private_start(); }
    Worker() : m_Running(false) { }
    ~Worker() { stop(); }

    template<typename... Args>
    void push_task(Args&&... args)
    {
        {
            std::lock_guard<std::mutex> lk(m_Mutex);
            m_Queue.push_back(std::bind(std::forward<Args>(args)...));
        }

        m_Condition.notify_all();
    }

    void start()
    {
        {
            std::lock_guard<std::mutex> lk(m_Mutex);
            if (m_Running == true) return;
            m_Running = true;
        }

        private_start();
    }

    void stop()
    {
        {
            std::lock_guard<std::mutex> lk(m_Mutex);
            if (m_Running == false) return;
            m_Running = false;
        }

        m_Condition.notify_all();
        m_Thread.join();
    }

private:
    void private_start()
    {
        m_Thread = std::thread([this]
        {
            for (;;)
            {
                decltype(m_Queue) local_queue;
                {
                    std::unique_lock<std::mutex> lk(m_Mutex);
                    m_Condition.wait(lk, [&] { return !m_Queue.empty() + !m_Running; });

                    if (!m_Running)
                    {
                        for (auto& func : m_Queue)
                            func();

                        m_Queue.clear();
                        return;
                    }

                    std::swap(m_Queue, local_queue);
                }

                for (auto& func : local_queue)
                    func();
            }
        });
    }

private:
    std::condition_variable m_Condition;
    std::list<std::function<void()>> m_Queue;
    std::mutex m_Mutex;
    std::thread m_Thread;
    bool m_Running = false;
};

void do_work(unsigned id)
{
    //static std::mutex cout_mutex;
    //std::lock_guard<std::mutex> lk(cout_mutex);
    std::cout << id << std::endl;
}

int main()
{
    {
        Worker workers[3];
        int counter = 0;

        for (auto& worker : workers)
            worker.start();

        for (auto& worker : workers)
        {
            for (int i = 0; i < 5; i++)
                worker.push_task(do_work, ++counter + i);
        }
    }

    std::cout << "finish" << std::endl;
    getchar();

    return 0;
}
Run Code Online (Sandbox Code Playgroud)


Sam*_*hik 3

对于“如何在 C++ 中实现此行为”的简短回答是,简单地编写代码来执行此操作。您自己确定的第一步是“如何检查这些线程是否已完成”。

有几种基本方法。但它们都归结为同一件事:不是让每个线程简单地消失,而是在每个线程终止之前通知父进程它已完成。

对于初学者来说,每个线程都应该知道它是哪个线程。在您的示例中,所有线程都放置在 a 中std::vector,并且它们由向量的索引标识。这不是唯一的方法。还有其他方法可以管理所有线程,但出于答案的目的,这就可以了。

然后,每个线程需要通过将线程索引号作为线程参数传递来知道它是什么索引。你的代码已经做到了。精彩的。

现在,要简单地结束循环:您只需使用 a 实例化 a std::mutex,std::condition_variable即可保护 astd::queue或 a std::list。或者,也许是std::set整数的 a。您可以自由决定哪个容器最适合您。

然后,在每个线程终止之前,它:

  • 锁定互斥体。

  • 将其线程索引放入容器中。

  • 表示条件变量。

  • 解锁互斥体,然后它立即返回,终止该线程。

然后是父线程,它启动了所有线程:

  • 锁定互斥体

  • 检查队列/集合/其他内容是否为空。如果是,则等待条件变量,直到不是为止。

  • 从队列/集合/其他内容中删除线程索引,并加入该线程。该线程刚刚终止。现在您知道哪个线程被终止,并且可以使用该信息执行您想要的操作。

  • 完成处理或重新启动线程后,它会再次检查队列是否为空。