如何在多线程C++中拆除观察者关系?

fiz*_*zer 11 c++ oop multithreading design-patterns

我有一个主题,提供Subscribe(Observer*)Unsubscribe(Observer*)提供给客户.Subject在其自己的线程中运行(从中调用Notify()订阅的Observers),并且互斥锁保护其内部的Observers列表.

我希望客户端代码 - 我无法控制 - 能够在取消订阅后安全地删除Observer.怎么能实现这一目标?

  • 持有互斥锁 - 甚至是递归的互斥锁 - 当我通知观察者时,由于死锁风险而无法选择.
  • 我可以在Unsubscribe调用中标记要删除的观察者,并将其从主题线程中删除.然后,客户可以等待特殊的"安全删除"通知.这看起来很安全,但对客户来说很麻烦.

编辑

一些说明性代码如下.问题是如何防止在"运行时出现问题"注释时发生取消订阅.然后我可以回拨一个已删除的对象.或者,如果我持有互斥锁而不是制作副本,我可以使某些客户端死锁.

#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer
{
public:
    void Notify() {}
};

class Subject
{
public:
    Subject() : t(bind(&Subject::Run, this))
    {
    }

    void Subscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.insert(o);
    }

    void Unsubscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.erase(o);
    }

    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            set<Observer*> notifyList;
            {
                mutex::scoped_lock l(m);
                notifyList = observers;
            }
            // Problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&Observer::Notify));
        }
    }

private:
    set<Observer*> observers;
    thread t;
    mutex m;
};
Run Code Online (Sandbox Code Playgroud)

编辑

由于死锁风险,我无法在持有互斥锁时通知观察者.这种情况最明显的方式 - 客户端从Notify内部调用Subscribe或Unsubscribe - 可以通过使互斥锁递归来轻松解决.更隐蔽的是不同线程上间歇性死锁的风险.

我处于多线程环境中,因此在线程执行的任何时候,它通常都会保存一系列锁L1,L2,... Ln.另一个线程将保持锁K1,K2,... Km.正确编写的客户端将确保不同的线程始终以相同的顺序获取锁.但是当客户端与我的Subject的互斥体交互时 - 称之为X - 这个策略将被破坏:调用订阅/取消订阅以L1,L2,... Ln,X的顺序获取锁定.从我的主题线程调用通知获取锁定X,K1,K2,... Km的顺序.如果Li或Kj中​​的任何一个可以在任何呼叫路径上重合,则客户端会遇到间歇性死锁,几乎没有调试它的可能性.由于我不控制客户端代码,我不能这样做.

Rob*_*b K 7

Unsubscribe()应该是同步的,因此在Observer被保证不再在Subject列表中之前它不会返回.这是安全地做到这一点的唯一方法.

ETA(将我的评论转移到答案):

由于时间似乎不是问题,因此在通知每个观察者之间取出并释放互斥锁.您将无法以现在的方式使用for_each,并且您必须检查迭代器以确保它仍然有效.

for ( ... )
{
    take mutex
    check iterator validity
    notify
    release mutex
}
Run Code Online (Sandbox Code Playgroud)

这将做你想要的.


Mat*_* M. 1

“理想”的解决方案将涉及使用shared_ptrweak_ptr。然而,为了通用,它还必须考虑Subject在其某些内容之前被删除的问题Observer(是的,这也可能发生)。

class Subject {
public:
    void Subscribe(std::weak_ptr<Observer> o);
    void Unsubscribe(std::weak_ptr<Observer> o);

private:
    std::mutex mutex;
    std::set< std::weak_ptr<Observer> > observers;
};

class Observer: boost::noncopyable {
public:
    ~Observer();

    void Notify();

private:
    std::mutex;
    std::weak_ptr<Subject> subject;
};
Run Code Online (Sandbox Code Playgroud)

通过这种结构,我们创建了一个循环图,但明智地使用 ,weak_ptr以便 和ObserverSubject可以在没有协调的情况下被破坏。

注意:为了简单起见,我假设Observer一次观察一个对象Subject,但它可以很容易地观察多个对象。


现在,您似乎陷入了不安全的内存管理之中。正如您可以想象的那样,这是一个相当困难的情况。在这种情况下,我建议一个实验:异步Unsubscribe. Unsubscribe或者至少,对外部的调用将是同步的,但会异步实现。

想法很简单:我们将使用事件队列来实现同步。那是:

  • 调用Unsubscribe在队列中发布事件(有效负载Observer*)然后等待
  • Subject线程处理Unsubscribe完事件后,它会唤醒等待的线程

您可以使用忙等待或条件变量,我建议使用条件变量,除非性能另有要求。

注意:这个解决方案完全无法解释Subject过早死亡的情况。