Syn*_*ose 11 c++ multithreading wait threadpool stdthread
我目前在我的线程池中有一个带有x worker 的程序.在主循环Ÿ任务分配给工人来完成,但之后的任务被发送出去,我必须与节目之前之前等待所有任务完成.我相信我目前的解决方案效率低下,必须有更好的方法等待所有任务完成,但我不知道如何去解决这个问题
// called in main after all tasks are enqueued to
// std::deque<std::function<void()>> tasks
void ThreadPool::waitFinished()
{
while(!tasks.empty()) //check if there are any tasks in queue waiting to be picked up
{
//do literally nothing
}
}
Run Code Online (Sandbox Code Playgroud)
更多信息:
线程池结构
//worker thread objects
class Worker {
public:
Worker(ThreadPool& s): pool(s) {}
void operator()();
private:
ThreadPool &pool;
};
//thread pool
class ThreadPool {
public:
ThreadPool(size_t);
template<class F>
void enqueue(F f);
void waitFinished();
~ThreadPool();
private:
friend class Worker;
//keeps track of threads so we can join
std::vector< std::thread > workers;
//task queue
std::deque< std::function<void()> > tasks;
//sync
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
Run Code Online (Sandbox Code Playgroud)
或者这是我的threadpool.hpp 的要点
我想要用的例子waitFinished()
:
while(running)
//....
for all particles alive
push particle position function to threadpool
end for
threadPool.waitFinished();
push new particle position data into openGL buffer
end while
Run Code Online (Sandbox Code Playgroud)
所以这样我可以发送成千上万的粒子位置任务并行完成,等待它们完成并将新数据放入openGL位置缓冲区
Who*_*aig 16
这是做你正在尝试的一种方法.在同一个互斥锁上使用两个条件变量不是为了轻松,除非你知道内部发生了什么.除了我希望展示每次运行之间完成了多少项目之外,我不需要原子处理过的成员.
此示例工作负载函数生成一百万个随机int值,然后对它们进行排序(必须以某种方式加热我的办公室). waitFinished
在队列为空且没有线程忙的情况下才会返回.
#include <iostream>
#include <deque>
#include <functional>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <random>
//thread pool
class ThreadPool
{
public:
ThreadPool(unsigned int n = std::thread::hardware_concurrency());
template<class F> void enqueue(F&& f);
void waitFinished();
~ThreadPool();
unsigned int getProcessed() const { return processed; }
private:
std::vector< std::thread > workers;
std::deque< std::function<void()> > tasks;
std::mutex queue_mutex;
std::condition_variable cv_task;
std::condition_variable cv_finished;
std::atomic_uint processed;
unsigned int busy;
bool stop;
void thread_proc();
};
ThreadPool::ThreadPool(unsigned int n)
: busy()
, processed()
, stop()
{
for (unsigned int i=0; i<n; ++i)
workers.emplace_back(std::bind(&ThreadPool::thread_proc, this));
}
ThreadPool::~ThreadPool()
{
// set stop-condition
std::unique_lock<std::mutex> latch(queue_mutex);
stop = true;
cv_task.notify_all();
latch.unlock();
// all threads terminate, then we're done.
for (auto& t : workers)
t.join();
}
void ThreadPool::thread_proc()
{
while (true)
{
std::unique_lock<std::mutex> latch(queue_mutex);
cv_task.wait(latch, [this](){ return stop || !tasks.empty(); });
if (!tasks.empty())
{
// got work. set busy.
++busy;
// pull from queue
auto fn = tasks.front();
tasks.pop_front();
// release lock. run async
latch.unlock();
// run function outside context
fn();
++processed;
latch.lock();
--busy;
cv_finished.notify_one();
}
else if (stop)
{
break;
}
}
}
// generic function push
template<class F>
void ThreadPool::enqueue(F&& f)
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace_back(std::forward<F>(f));
cv_task.notify_one();
}
// waits until the queue is empty.
void ThreadPool::waitFinished()
{
std::unique_lock<std::mutex> lock(queue_mutex);
cv_finished.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
}
// a cpu-busy task.
void work_proc()
{
std::random_device rd;
std::mt19937 rng(rd());
// build a vector of random numbers
std::vector<int> data;
data.reserve(100000);
std::generate_n(std::back_inserter(data), data.capacity(), [&](){ return rng(); });
std::sort(data.begin(), data.end(), std::greater<int>());
}
int main()
{
ThreadPool tp;
// run five batches of 100 items
for (int x=0; x<5; ++x)
{
// queue 100 work tasks
for (int i=0; i<100; ++i)
tp.enqueue(work_proc);
tp.waitFinished();
std::cout << tp.getProcessed() << '\n';
}
// destructor will close down thread pool
return EXIT_SUCCESS;
}
Run Code Online (Sandbox Code Playgroud)
产量
100
200
300
400
500
Run Code Online (Sandbox Code Playgroud)
祝你好运.
归档时间: |
|
查看次数: |
5875 次 |
最近记录: |