C++ 11中的线程池

Ykt*_*ula 113 c++ multithreading threadpool c++11 stdthread

相关问题:

关于C++ 11:

关于Boost:


我如何获得一个线程池,以任务发送到,而无需创建和删除它们一遍又一遍?这意味着要在不加入的情况下重新同步的持久线程.


我的代码看起来像这样:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}
Run Code Online (Sandbox Code Playgroud)

我不想在每次迭代时创建和连接线程,而是希望每次迭代都将任务发送到我的工作线程,并且只创建一次.

小智 71

您可以使用C++线程池库,https://github.com/vit-vit/ctpl.

然后您编写的代码可以替换为以下代码

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}
Run Code Online (Sandbox Code Playgroud)

您将获得所需数量的线程,并且不会在迭代中反复创建和删除它们.

  • 这应该是答案; 单头,可读,简单,简洁,符合标准的C++ 11库.做得好! (10认同)

Ker*_* SB 61

线程池意味着所有线程一直在运行 - 换句话说,线程函数永远不会返回.为了给线程一些有意义的事情,你必须设计一个线程间通信系统,既可以告诉线程有什么要做,也可以用来传递实际的工作数据.

通常,这将涉及某种并发数据结构,并且每个线程可能会睡在某种条件变量上,当有工作要做时会通知它.在接收到通知时,一个或多个线程唤醒,从并发数据结构恢复任务,处理它,并以类似的方式存储结果.

然后该线程将继续检查是否还有更多工作要做,如果没有再回去睡觉.

结果是你必须自己设计所有这些,因为没有一种普遍适用的"工作"的自然概念.这是相当多的工作,你需要做一些微妙的问题.(如果你喜欢在幕后为你负责线程管理的系统,你可以在Go中编程.)

  • "你必须自己设计所有这些"< - 这就是我要避免做的事情.但是Goroutines看起来很棒. (9认同)
  • @Yktula:好吧,这是一项非常重要的任务。从您的帖子中甚至还不清楚您要完成哪种工作,而这对于解决方案至关重要。您可以在C ++中实现Go,但这将是非常具体的事情,一半的人会抱怨他们想要不同的东西。 (2认同)

PhD*_*EcE 58

这是从我的回答复制到另一个非常相似的帖子,希望它可以帮助:

1)从系统可以支持的最大线程数开始:

int Num_Threads =  thread::hardware_concurrency();
Run Code Online (Sandbox Code Playgroud)

2)对于高效的线程池实现,一旦根据Num_Threads创建线程,最好不要创建新的线程,或者销毁旧线程(通过连接).会有性能损失,甚至可能使您的应用程序比串行版本慢.

每个C++ 11线程都应该在其函数中运行,并且无限循环,不断等待抓取和运行新任务.

以下是如何将此类函数附加到线程池:

int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{  Pool.push_back(thread(Infinite_loop_function));}
Run Code Online (Sandbox Code Playgroud)

3)Infinite_loop_function

这是一个等待任务队列的"while(true)"循环

void The_Pool:: Infinite_loop_function()
{
    while(true)
    {
        {
            unique_lock<mutex> lock(Queue_Mutex);

            condition.wait(lock, []{return !Queue.empty() || therminate_pool});
            Job = Queue.front();
            Queue.pop();
        }
        Job(); // function<void()> type
    }
};
Run Code Online (Sandbox Code Playgroud)

4)创建一个向您的队列添加作业的功能

void The_Pool:: Add_Job(function<void()> New_Job)
{
    {
        unique_lock<mutex> lock(Queue_Mutex);
        Queue.push(New_Job);
    }
    condition.notify_one();
}
Run Code Online (Sandbox Code Playgroud)

5)将任意函数绑定到您的队列

Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
Run Code Online (Sandbox Code Playgroud)

一旦集成了这些成分,您就拥有了自己的动态线程池.这些线程总是运行,等待工作.

如果有一些语法错误我道歉,我键入了这些代码并且我的内存不好.很抱歉,我无法向您提供完整的线程池代码,这会违反我的工作诚信.

  • 我猜想在方法 `Start()` 中, `threads.at(i) = std::thread(ThreadLoop);` 应该是 `threads.at(i) = std::thread(&amp;ThreadPool::ThreadLoop, this) ;` (11认同)
  • “Infinite_loop_function”是一个有趣的名称,它是一个函数,它消耗队列中的任务并执行它们。 (6认同)
  • 当您终止并且没有剩余工作时会发生什么? (4认同)
  • 我认为有一个错误: `poolbusy = jobs.empty()` 应该是 `poolbusy = !jobs.empty()` (3认同)
  • @ChristopherPisz `std::vector` 不要求其元素是可复制的。您可以使用仅移动类型(`unique_ptr`、`thread`、`future` 等)的向量。 (2认同)
  • shutdown()中,应该是thread_vector.clear();而不是 thread_vector.empty(); 正确的? (2认同)
  • `busy()` 不应该有返回类型 `bool` 吗? (2认同)

did*_*erc 18

线程池是核心的一组线程,所有线程都绑定到作为事件循环的函数.这些线程将无休止地等待执行任务或自己终止.

线程池作业是提供一个接口来提交作业,定义(并可能修改)运行这些作业的策略(调度规则,线程实例化,池的大小),以及监视线程和相关资源的状态.

因此,对于一个多功能池,必须首先定义任务是什么,如何启动,中断,结果是什么(请参阅该问题的承诺和未来的概念),线程必须响应什么类型的事件他们将如何处理这些事件,以及如何将这些事件与任务处理的事件区分开来.正如您所见,这可能变得非常复杂,并且随着解决方案变得越来越复杂,对线程的工作方式施加限制.

当前用于处理事件的工具是相当准确的(*):诸如互斥体,条件变量之类的原语,以及在它之上的一些抽象(锁,障碍).但在某些情况下,这些abstration可能会变得不合适(参见相关问题),并且必须恢复使用原语.

还必须管理其他问题:

  • 信号
  • I/O
  • 硬件(处理器亲和力,异构设置)

这些如何在你的环境中发挥作用?

对类似问题的这个答案指向一个用于boost和stl的现有实现.

我为另一个问题提供了一个非常粗略的线程池实现,它没有解决上面列出的许多问题.你可能想要建立它.您可能还希望了解其他语言的现有框架,以寻找灵感.


(*)我不认为这是一个问题,恰恰相反.我认为这是从C继承的C++的精神.


pio*_*pio 11

Follwoing [PhD EcE](/sf/users/267289221/) suggestion, I implemented the thread pool:
Run Code Online (Sandbox Code Playgroud)

函数池.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};
Run Code Online (Sandbox Code Playgroud)

函数池.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}
Run Code Online (Sandbox Code Playgroud)

主程序

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 谢谢!这确实帮助我开始了并行线程操作。我最终使用了您的实现的稍微修改版本。 (3认同)
  • 您不需要 **m_accept_functions** 成为原子类型。**m_accept_functions** 受互斥锁保护。 (2认同)

Ami*_*ati 8

您可以使用boost 库中的thread_pool

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}
Run Code Online (Sandbox Code Playgroud)

您还可以使用来自开源社区的线程池

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}
Run Code Online (Sandbox Code Playgroud)