Emi*_*rik 5 c++ boost boost-mutex
我有一个c ++类,分配了大量的内存.它通过调用第三方库来实现这一点,如果它无法分配内存,则会崩溃,有时我的应用程序会在并行线程中创建我的类的多个实例.由于线程太多,我遇到了崩溃.我最好的解决方案是确保从不会有超过三个实例同时运行.(这是一个好主意吗?)和我为实现当前最好的办法即是使用升压互斥.以下伪代码的行,
MyClass::MyClass(){
my_thread_number = -1; //this is a class variable
while (my_thread_number == -1)
for (int i=0; i < MAX_PROCESSES; i++)
if(try_lock a mutex named i){
my_thread_number = i;
break;
}
//Now I know that my thread has mutex number i and it is allowed to run
}
MyClass::~MyClass(){
release mutex named my_thread_number
}
Run Code Online (Sandbox Code Playgroud)
如你所见,我不太确定互斥体的确切语法.所以总结一下,我的问题是
编辑:我意识到我在谈论线程,而不是进程.编辑:我参与构建可以在Linux和Windows上运行的应用程序...
更新我的另一个答案解决了线程之间的调度资源(在澄清问题之后).
它显示了一种信号量方法,用于协调(许多)工作人员之间的工作,以及a
thread_pool首先限制工人并将工作排队.
在linux(也许还有其他操作系统?)上,您可以使用锁定文件习惯用法(但某些文件系统和旧内核不支持它).
我建议使用Interprocess同步对象.
例如,使用名为信号量的Boost Interprocess:
#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>
int main()
{
using namespace boost::interprocess;
named_semaphore sem(open_or_create, "ffed38bd-f0fc-4f79-8838-5301c328268c", 0ul);
if (sem.try_wait())
{
std::cout << "Oops, second instance\n";
}
else
{
sem.post();
// feign hard work for 30s
boost::this_thread::sleep_for(boost::chrono::seconds(30));
if (sem.try_wait())
{
sem.remove("ffed38bd-f0fc-4f79-8838-5301c328268c");
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果你在后台开始一个副本,新副本将"拒绝"开始("哎呀,二次")大约30秒.
我觉得在这里扭转逻辑可能更容易.嗯.勒米试试.
一段时间过去了
呵呵.这比我想象的要复杂得多.
问题是,您希望确保在应用程序中断或终止时锁不会保留.为了分享便携式处理信号的技术:
#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>
#include <boost/asio.hpp>
#define MAX_PROCESS_INSTANCES 3
boost::interprocess::named_semaphore sem(
boost::interprocess::open_or_create,
"4de7ddfe-2bd5-428f-b74d-080970f980be",
MAX_PROCESS_INSTANCES);
// to handle signals:
boost::asio::io_service service;
boost::asio::signal_set sig(service);
int main()
{
if (sem.try_wait())
{
sig.add(SIGINT);
sig.add(SIGTERM);
sig.add(SIGABRT);
sig.async_wait([](boost::system::error_code,int sig){
std::cerr << "Exiting with signal " << sig << "...\n";
sem.post();
});
boost::thread sig_listener([&] { service.run(); });
boost::this_thread::sleep_for(boost::chrono::seconds(3));
service.post([&] { sig.cancel(); });
sig_listener.join();
}
else
{
std::cout << "More than " << MAX_PROCESS_INSTANCES << " instances not allowed\n";
}
}
Run Code Online (Sandbox Code Playgroud)
那里有很多可以解释的东西.如果您有兴趣,请告诉我.
注意很明显,如果
kill -9在你的应用程序上使用(强制终止),那么所有的赌注都会关闭,你必须删除Name Semaphore对象或明确解锁它(post()).
这是我系统上的一个测试:
sehe@desktop:/tmp$ (for a in {1..6}; do ./test& done; time wait)
More than 3 instances not allowed
More than 3 instances not allowed
More than 3 instances not allowed
Exiting with signal 0...
Exiting with signal 0...
Exiting with signal 0...
real 0m3.005s
user 0m0.013s
sys 0m0.012s
Run Code Online (Sandbox Code Playgroud)
这是实现您自己的“信号量”的一种简单方法(因为我认为标准库或 boost 没有)。这选择了“合作”方式,工人将互相等待:
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
void the_work(int id)
{
static int running = 0;
std::cout << "worker " << id << " entered (" << running << " running)\n";
static mutex mx;
static condition_variable cv;
// synchronize here, waiting until we can begin work
{
unique_lock<mutex> lk(mx);
cv.wait(lk, phoenix::cref(running) < 3);
running += 1;
}
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done\n";
// signal one other worker, if waiting
{
lock_guard<mutex> lk(mx);
running -= 1;
cv.notify_one();
}
}
int main()
{
thread_group pool;
for (int i = 0; i < 10; ++i)
pool.create_thread(bind(the_work, i));
pool.join_all();
}
Run Code Online (Sandbox Code Playgroud)
现在,我想说,最好有一个由 n 个工作人员组成的专用池,轮流从队列中获取工作:
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
private:
mutex mx;
condition_variable cv;
typedef function<void()> job_t;
std::deque<job_t> _queue;
thread_group pool;
boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}
public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}
void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));
cv.notify_one();
}
optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
if (_queue.empty())
return none;
auto job = std::move(_queue.front());
_queue.pop_front();
return std::move(job);
}
~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
};
void the_work(int id)
{
std::cout << "worker " << id << " entered\n";
// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done\n";
}
int main()
{
thread_pool pool; // uses 1 thread per core
for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
}
Run Code Online (Sandbox Code Playgroud)
附言。如果您愿意,可以使用 C++11 lambda 代替 boost::phoenix。