fiz*_*zer 11 c++ oop multithreading design-patterns
我有一个主题,提供Subscribe(Observer*)
和Unsubscribe(Observer*)
提供给客户.Subject在其自己的线程中运行(从中调用Notify()
订阅的Observers),并且互斥锁保护其内部的Observers列表.
我希望客户端代码 - 我无法控制 - 能够在取消订阅后安全地删除Observer.怎么能实现这一目标?
编辑
一些说明性代码如下.问题是如何防止在"运行时出现问题"注释时发生取消订阅.然后我可以回拨一个已删除的对象.或者,如果我持有互斥锁而不是制作副本,我可以使某些客户端死锁.
#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
using namespace std;
using namespace boost;
class Observer
{
public:
void Notify() {}
};
class Subject
{
public:
Subject() : t(bind(&Subject::Run, this))
{
}
void Subscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.insert(o);
}
void Unsubscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.erase(o);
}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
set<Observer*> notifyList;
{
mutex::scoped_lock l(m);
notifyList = observers;
}
// Problem here
for_each(notifyList.begin(), notifyList.end(),
mem_fun(&Observer::Notify));
}
}
private:
set<Observer*> observers;
thread t;
mutex m;
};
Run Code Online (Sandbox Code Playgroud)
编辑
由于死锁风险,我无法在持有互斥锁时通知观察者.这种情况最明显的方式 - 客户端从Notify内部调用Subscribe或Unsubscribe - 可以通过使互斥锁递归来轻松解决.更隐蔽的是不同线程上间歇性死锁的风险.
我处于多线程环境中,因此在线程执行的任何时候,它通常都会保存一系列锁L1,L2,... Ln.另一个线程将保持锁K1,K2,... Km.正确编写的客户端将确保不同的线程始终以相同的顺序获取锁.但是当客户端与我的Subject的互斥体交互时 - 称之为X - 这个策略将被破坏:调用订阅/取消订阅以L1,L2,... Ln,X的顺序获取锁定.从我的主题线程调用通知获取锁定X,K1,K2,... Km的顺序.如果Li或Kj中的任何一个可以在任何呼叫路径上重合,则客户端会遇到间歇性死锁,几乎没有调试它的可能性.由于我不控制客户端代码,我不能这样做.
Unsubscribe()应该是同步的,因此在Observer被保证不再在Subject列表中之前它不会返回.这是安全地做到这一点的唯一方法.
ETA(将我的评论转移到答案):
由于时间似乎不是问题,因此在通知每个观察者之间取出并释放互斥锁.您将无法以现在的方式使用for_each,并且您必须检查迭代器以确保它仍然有效.
for ( ... )
{
take mutex
check iterator validity
notify
release mutex
}
Run Code Online (Sandbox Code Playgroud)
这将做你想要的.
“理想”的解决方案将涉及使用shared_ptr
和weak_ptr
。然而,为了通用,它还必须考虑Subject
在其某些内容之前被删除的问题Observer
(是的,这也可能发生)。
class Subject {
public:
void Subscribe(std::weak_ptr<Observer> o);
void Unsubscribe(std::weak_ptr<Observer> o);
private:
std::mutex mutex;
std::set< std::weak_ptr<Observer> > observers;
};
class Observer: boost::noncopyable {
public:
~Observer();
void Notify();
private:
std::mutex;
std::weak_ptr<Subject> subject;
};
Run Code Online (Sandbox Code Playgroud)
通过这种结构,我们创建了一个循环图,但明智地使用 ,weak_ptr
以便 和Observer
都Subject
可以在没有协调的情况下被破坏。
注意:为了简单起见,我假设Observer
一次观察一个对象Subject
,但它可以很容易地观察多个对象。
现在,您似乎陷入了不安全的内存管理之中。正如您可以想象的那样,这是一个相当困难的情况。在这种情况下,我建议一个实验:异步Unsubscribe
. Unsubscribe
或者至少,对外部的调用将是同步的,但会异步实现。
想法很简单:我们将使用事件队列来实现同步。那是:
Unsubscribe
在队列中发布事件(有效负载Observer*
)然后等待Subject
线程处理Unsubscribe
完事件后,它会唤醒等待的线程您可以使用忙等待或条件变量,我建议使用条件变量,除非性能另有要求。
注意:这个解决方案完全无法解释Subject
过早死亡的情况。