Ykt*_*ula 113 c++ multithreading threadpool c++11 stdthread
相关问题:
关于C++ 11:
关于Boost:
我如何获得一个线程池,以任务发送到,而无需创建和删除它们一遍又一遍?这意味着要在不加入的情况下重新同步的持久线程.
我的代码看起来像这样:
namespace {
std::vector<std::thread> workers;
int total = 4;
int arr[4] = {0};
void each_thread_does(int i) {
arr[i] += 2;
}
}
int main(int argc, char *argv[]) {
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
workers.push_back(std::thread(each_thread_does, j));
}
for (std::thread &t: workers) {
if (t.joinable()) {
t.join();
}
}
arr[4] = std::min_element(arr, arr+4);
}
return 0;
}
Run Code Online (Sandbox Code Playgroud)
我不想在每次迭代时创建和连接线程,而是希望每次迭代都将任务发送到我的工作线程,并且只创建一次.
小智 71
您可以使用C++线程池库,https://github.com/vit-vit/ctpl.
然后您编写的代码可以替换为以下代码
#include <ctpl.h> // or <ctpl_stl.h> if ou do not have Boost library
int main (int argc, char *argv[]) {
ctpl::thread_pool p(2 /* two threads in the pool */);
int arr[4] = {0};
std::vector<std::future<void>> results(4);
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
results[j] = p.push([&arr, j](int){ arr[j] +=2; });
}
for (int j = 0; j < 4; ++j) {
results[j].get();
}
arr[4] = std::min_element(arr, arr + 4);
}
}
Run Code Online (Sandbox Code Playgroud)
您将获得所需数量的线程,并且不会在迭代中反复创建和删除它们.
Ker*_* SB 61
线程池意味着所有线程一直在运行 - 换句话说,线程函数永远不会返回.为了给线程一些有意义的事情,你必须设计一个线程间通信系统,既可以告诉线程有什么要做,也可以用来传递实际的工作数据.
通常,这将涉及某种并发数据结构,并且每个线程可能会睡在某种条件变量上,当有工作要做时会通知它.在接收到通知时,一个或多个线程唤醒,从并发数据结构恢复任务,处理它,并以类似的方式存储结果.
然后该线程将继续检查是否还有更多工作要做,如果没有再回去睡觉.
结果是你必须自己设计所有这些,因为没有一种普遍适用的"工作"的自然概念.这是相当多的工作,你需要做一些微妙的问题.(如果你喜欢在幕后为你负责线程管理的系统,你可以在Go中编程.)
PhD*_*EcE 58
这是从我的回答复制到另一个非常相似的帖子,希望它可以帮助:
1)从系统可以支持的最大线程数开始:
int Num_Threads = thread::hardware_concurrency();
Run Code Online (Sandbox Code Playgroud)
2)对于高效的线程池实现,一旦根据Num_Threads创建线程,最好不要创建新的线程,或者销毁旧线程(通过连接).会有性能损失,甚至可能使您的应用程序比串行版本慢.
每个C++ 11线程都应该在其函数中运行,并且无限循环,不断等待抓取和运行新任务.
以下是如何将此类函数附加到线程池:
int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{ Pool.push_back(thread(Infinite_loop_function));}
Run Code Online (Sandbox Code Playgroud)
3)Infinite_loop_function
这是一个等待任务队列的"while(true)"循环
void The_Pool:: Infinite_loop_function()
{
while(true)
{
{
unique_lock<mutex> lock(Queue_Mutex);
condition.wait(lock, []{return !Queue.empty() || therminate_pool});
Job = Queue.front();
Queue.pop();
}
Job(); // function<void()> type
}
};
Run Code Online (Sandbox Code Playgroud)
4)创建一个向您的队列添加作业的功能
void The_Pool:: Add_Job(function<void()> New_Job)
{
{
unique_lock<mutex> lock(Queue_Mutex);
Queue.push(New_Job);
}
condition.notify_one();
}
Run Code Online (Sandbox Code Playgroud)
5)将任意函数绑定到您的队列
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
Run Code Online (Sandbox Code Playgroud)
一旦集成了这些成分,您就拥有了自己的动态线程池.这些线程总是运行,等待工作.
如果有一些语法错误我道歉,我键入了这些代码并且我的内存不好.很抱歉,我无法向您提供完整的线程池代码,这会违反我的工作诚信.
did*_*erc 18
线程池是核心的一组线程,所有线程都绑定到作为事件循环的函数.这些线程将无休止地等待执行任务或自己终止.
线程池作业是提供一个接口来提交作业,定义(并可能修改)运行这些作业的策略(调度规则,线程实例化,池的大小),以及监视线程和相关资源的状态.
因此,对于一个多功能池,必须首先定义任务是什么,如何启动,中断,结果是什么(请参阅该问题的承诺和未来的概念),线程必须响应什么类型的事件他们将如何处理这些事件,以及如何将这些事件与任务处理的事件区分开来.正如您所见,这可能变得非常复杂,并且随着解决方案变得越来越复杂,对线程的工作方式施加限制.
当前用于处理事件的工具是相当准确的(*):诸如互斥体,条件变量之类的原语,以及在它之上的一些抽象(锁,障碍).但在某些情况下,这些abstration可能会变得不合适(参见相关问题),并且必须恢复使用原语.
还必须管理其他问题:
这些如何在你的环境中发挥作用?
对类似问题的这个答案指向一个用于boost和stl的现有实现.
我为另一个问题提供了一个非常粗略的线程池实现,它没有解决上面列出的许多问题.你可能想要建立它.您可能还希望了解其他语言的现有框架,以寻找灵感.
(*)我不认为这是一个问题,恰恰相反.我认为这是从C继承的C++的精神.
pio*_*pio 11
Follwoing [PhD EcE](/sf/users/267289221/) suggestion, I implemented the thread pool:
Run Code Online (Sandbox Code Playgroud)
函数池.h
#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>
class Function_pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
std::atomic<bool> m_accept_functions;
public:
Function_pool();
~Function_pool();
void push(std::function<void()> func);
void done();
void infinite_loop_func();
};
Run Code Online (Sandbox Code Playgroud)
函数池.cpp
#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}
Function_pool::~Function_pool()
{
}
void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
void Function_pool::done()
{
std::unique_lock<std::mutex> lock(m_lock);
m_accept_functions = false;
lock.unlock();
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
m_data_condition.notify_all();
//notify all waiting threads.
}
void Function_pool::infinite_loop_func()
{
std::function<void()> func;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
if (!m_accept_functions && m_function_queue.empty())
{
//lock will be release automatically.
//finish the thread loop and let it join in the main thread.
return;
}
func = m_function_queue.front();
m_function_queue.pop();
//release the lock
}
func();
}
}
Run Code Online (Sandbox Code Playgroud)
主程序
#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
class quit_worker_exception : public std::exception {};
void example_function()
{
std::cout << "bla" << std::endl;
}
int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
}
//here we should send our functions
for (int i = 0; i < 50; i++)
{
func_pool.push(example_function);
}
func_pool.done();
for (unsigned int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
}
Run Code Online (Sandbox Code Playgroud)
您可以使用boost 库中的thread_pool:
void my_task(){...}
int main(){
int threadNumbers = thread::hardware_concurrency();
boost::asio::thread_pool pool(threadNumbers);
// Submit a function to the pool.
boost::asio::post(pool, my_task);
// Submit a lambda object to the pool.
boost::asio::post(pool, []() {
...
});
}
Run Code Online (Sandbox Code Playgroud)
您还可以使用来自开源社区的线程池:
void first_task() {...}
void second_task() {...}
int main(){
int threadNumbers = thread::hardware_concurrency();
pool tp(threadNumbers);
// Add some tasks to the pool.
tp.schedule(&first_task);
tp.schedule(&second_task);
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
171472 次 |
| 最近记录: |