c ++ lambda回调触发事件

ari*_*_JC 2 c++ lambda callback c++11

我一直试图围绕c ++中的回调功能.我想要实现的目标如下:

我有两个对象,每个对象都有自己的线程.一个对象A具有指向第二个对象的指针B.见例子:

class A
{
  public:
   // ...
  private:
   std::unique_ptr<B> b;
};

class B
{
  public:
   void add_message(MessageType msg);
   // ...
};
Run Code Online (Sandbox Code Playgroud)

我想要实现的是让对象A使用指针添加消息B然后继续执行其他操作,但是有回调或处理程序或者在B回复该消息时触发的内容.B对消息进行一些处理并将其传递给其他对象以便在其自己的线程上进行处理,但最终会得到一个回复​​.那么我怎么知道什么时候B回复我的消息,例如:

// In class A
MessageType m();
b->add_message(m)
// A's thread continues doing other stuff
...
// some notification that b has a reply?
Run Code Online (Sandbox Code Playgroud)

我知道我可能不得不使用std :: function作为我想要使用的回调但是我不能通过查看已经有很多例子来了解如何做到这一点.任何帮助是值得赞赏的,并注意到我已经看了很多例子,但不能把它绑回到我想要实现的目标或者不理解......

Yak*_*ont 5

线程是执行的序列.它们的行为大致类似于线性C++程序,嵌入在内存模型中,允许它们进行通信并注意由其他执行线程引起的状态更改.

如果没有线程的合作,对线程的回调不能接管一系列执行.您要通知的线程必须明确检查消息是否已到达并处理它.


处理消息响应有两种常用方法.

第一种是std::future类似的方法.在其中,调用者获得某种类型的标记,该标记表示将来可能或将要生成的答案.

第二种是再次使用消息传递.您向B发送消息,请求回复.B向A发回包含响应的消息.与B接收消息的方式相同,A接收消息.该消息可能包含某种"返回目标"以帮助A将其链接到原始消息.

在基于消息的系统中,通常有一个"事件循环".你有一个重复返回"事件循环"的线程,而不是一个大型的线性程序.在那里它检查队列中的消息,如果没有,则等待一些消息.

必须在这样的系统下将任务分解为一口大小的块,以便您经常检查事件循环以获得响应.

一种方法是使用协同程序,一种执行状态,而不拥有自己的执行程序(就像一个拥有两者的线程).协同程序会定期放弃优先级并"将状态保存到以后".


未来的解决方案通常是最简单的,但它依赖于A定期检查响应.

首先,a threaded_queue<T>,允许任意数量的生产者和消费者将事物传递到队列中并从前面吃掉它们:

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};
Run Code Online (Sandbox Code Playgroud)

现在,我们希望将任务传递到这样的队列中,并让传递任务的人获得结果.以下是打包任务的上述用法:

template<class...Args>
struct threaded_task_queue {
  threaded_task_queue() = default;
  threaded_task_queue( threaded_task_queue&& ) = delete;
  threaded_task_queue& operator=( threaded_task_queue&& ) = delete;
  ~threaded_task_queue() = default;
  template<class F, class R=std::result_of_t<F&(Args...)>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R(Args...)> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  void terminate() {
    tasks.terminate();
  }
  std::function<void(Args...)> pop_task() {
    auto task = tasks.pop_front();
    if (!task) return {};
    auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task));
    return [task_ptr](Args...args){
      (*task_ptr)(std::forward<Args>(args)...);
    };
  }
private:
  threaded_queue<std::packaged_task<void(Args...)>> tasks;
};
Run Code Online (Sandbox Code Playgroud)

如果我做得对,它的工作原理如下:

  • A以lambda的形式向B发送任务队列.这个lambda采用一些固定的参数集(由B提供),并返回一些值.

  • B弹出队列,并获取一个std::function接受参数的队列.它调用它; 它void在B的上下文中返回.

  • A future<R>在排队任务时给出了一个.它可以查询它以查看它是否已完成.

你会注意到A不能被"通知"事情已经完成.这需要不同的解决方案.但是如果A最终达到无法等待B的结果就无法前进的程度,这个系统就可以工作了.

另一方面,如果A积累大量此类消息,有时需要等待来自许多此类B的输入,直到其中任何一个返回数据(或用户执行某些操作),您需要比a更先进的东西std::future<R>.一般模式 - 具有表示将要交付的未来计算的令牌 - 是可靠的.但是你需要增加它以适应未来计算和消息循环等的多个来源.

代码未经测试.

threaded_task_queue发送邮件的一种方法是:

template<class Signature>
struct message_queue;
template<class R, class...Args>
struct message_queue<R(Args...) :
  threaded_task_queue< std::function<R(Args...)> >
{
  std::future<R> queue_message(Args...args) {
    return this->queue_task(
      [tup = std::make_tuple(std::forward<Args>(args)...)]
      ( std::function<R(Args...)> f ) mutable
      {
        return std::apply( f, std::move(tup) );
      }
    );
  }
  bool consume_message( std::function<R(Args...)> f )
  {
    auto task = pop_task();
    if (!task) return false;
    task( std::move(f) );
    return true;
  }
};
Run Code Online (Sandbox Code Playgroud)

在提供者方面,您提供Args...,在消费者方面,您消费Args...和返回R,在提供者方面,您可以future<R>在消费者完成后获得结果.

这可能比threaded_task_queue我写的原始文件更自然.

std::apply 是C++ 17但是C++ 11和C++ 14的实现很普遍.