Joh*_*Doe 11 c++ multithreading gcc boost boost-asio
我有一个"主"功能,每个时间步一次执行许多小的,独立的任务.但是,在每个时间步之后,我必须等待所有任务完成才能前进.
我想让程序多线程化.我已尝试使用boost-offshoot线程池实现,我尝试使用(共享指针)线程的向量,我尝试了asio线程池的想法(使用io_service,建立一些工作,然后分发运行到线程和发布处理程序到io_service).
所有这些似乎都有很多开销为我的"许多小任务"创建和销毁线程,我想要一种方法,最好使用asio工具,实例化一个io_service,一个thread_group,将处理程序发布到io_service,等待在发布更多任务之前完成单个时间步的工作.有没有办法做到这一点?这是我现在工作的(剥离)代码:
boost::asio::io_service io_service;
for(int theTime = 0; theTime != totalTime; ++theTime)
{
io_service.reset();
boost::thread_group threads;
// scoping to destroy the work object after work is finished being assigned
{
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run,
&io_service));
}
for(int i = 0; i < numSmallTasks; ++i)
{
io_service.post(boost::bind(&process_data, i, theTime));
}
}
threads.join_all();
}
Run Code Online (Sandbox Code Playgroud)
这是我所拥有的(但不知道如何实现):
boost::asio::io_service io_service;
boost::thread_group threads;
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run,
&io_service));
}
for(int theTime = 0; theTime != totalTime; ++theTime)
{
for(int i = 0; i < numSmallTasks; ++i)
{
io_service.post(boost::bind(&process_data, i, theTime));
}
// wait here until all of these tasks are finished before looping
// **** how do I do this? *****
}
// destroy work later and join all threads later...
Run Code Online (Sandbox Code Playgroud)
Ros*_*ost 11
您可以使用期货进行数据处理并使用它们进行同步boost::wait_for_all().这将允许您根据完成的工作部分而不是线程进行操作.
int process_data() {...}
// Pending futures
std::vector<boost::unique_future<int>> pending_data;
for(int i = 0; i < numSmallTasks; ++i)
{
// Create task and corresponding future
// Using shared ptr and binding operator() trick because
// packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable
// Boost 1.51 syntax
// For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
typedef boost::packaged_task<int> task_t;
boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
boost::bind(&process_data, i, theTime));
boost::unique_future<int> fut = task->get_future();
pending_data.push_back(std::move(fut));
io_service.post(boost::bind(&task_t::operator(), task));
}
// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end());
Run Code Online (Sandbox Code Playgroud)