使用GUI和工作线程增强Asio模式

Enr*_*oma 5 c++ boost-asio

我想使用一个用于GUI的线程和一个用于某个套接字IO的工作线程来实现Boost Asio模式.

工作线程将用于boost::asio::io_service管理套接字客户端.套接字上的所有操作仅由工作线程执行.

GUI线程需要从工作线程发送和接收消息.

我无法确定如何使用Boost Asio实现此模式.

我已经用标准的Asio方式实现了套接字通信(我io_service.run()从工作线程调用并使用async_read_some/ async_send).我不需要strands因为io_service.run()只是从工作线程调用.

现在我正在尝试添加跨线程消息队列.我该如何实施呢?

我应该runio_service从GUI线程呢?

或者我应该使用strandswith post将消息从GUI线程发布到工作线程(不调用io_service.run()io_service.poll_one()从GUI线程),并使用操作系统的GUI消息循环将消息从工作线程发布到GUI线程?

如果我还需要调用io_service.run()io_service.poll_one()从GUI线程调用,我是否需要strands在套接字操作上使用,因为它io_service是在两个线程之间共享的?

编辑:为了澄清我的问题,我想尽我所能,使用Boost Asio来实现消息队列,只有当Boost Asio无法完成工作时才依赖其他库.

Tan*_*ury 9

消息传递相当通用.有多种方法可以解决问题,解决方案可能取决于所需的行为细节.例如,阻塞或非阻塞,控制内存分配,上下文等.

  • Boost.Lockfree为单/多消费者/生产者提供线程安全的无锁无阻塞队列.它倾向于非常适合于事件循环,在这种情况下,消费者被阻塞是不理想的,等待生产者发出同步结构的信号.

    boost::lockfree::queue<message_type> worker_message_queue;
    
    void send_worker_message(const message_type& message)
    {
      // Add message to worker message queue.
      worker_message_queue.push(message);
    
      // Add work to worker_io_service that will process the queue.
      worker_io_service.post(&process_message); 
    }
    
    void process_message()
    {
      message_type message;
    
      // If the message was not retrieved, then return early.
      if (!worker_message_queue.pop(message)) return;
    
      ...
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 可替代地,Boost.Asio的io_service可以作为一个队列的作用.消息只需要绑定到指定的处理程序.

    void send_worker_message(const message_type& message)
    {
      // Add work to worker_io_service that will process the message.
      worker_io_service.post(boost::bind(&process_message, message)); 
    }
    
    void process_message(message_type& message)
    {
      ...
    }
    
    Run Code Online (Sandbox Code Playgroud)

这条评论表明,欲望不仅仅是传递信息.听起来好像最终目标是允许一个线程使另一个线程调用任意函数.

如果是这种情况,那么考虑:

  • 使用Boost.Signals2进行托管信号和插槽实现.这允许任意函数注册信号.
  • 使用Boost.Asio io_service来设置信号发射.如果GUI线程和工作线程都有自己的io_service,那么工作线程可以将处理程序发布到io_service将发出信号的GUI线程中.在GUI线程的主循环中,它将轮询io_service,发出信号,并从GUI线程的上下文中调用槽.

这是完整的示例,其中两个线程将消息(作为unsigned int)传递给另一个,以及导致在另一个线程内调用任意函数.

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/signals2.hpp>
#include <boost/thread.hpp>

/// @brief io_service dedicated to gui.
boost::asio::io_service gui_service;

/// @brief io_service dedicated to worker.
boost::asio::io_service worker_service;

/// @brief work to keep gui_service from stopping prematurely.
boost::optional<boost::asio::io_service::work> gui_work;

/// @brief hello slot.
void hello(int x)
{
  std::cout << "hello with " << x << " from thread " << 
               boost::this_thread::get_id() << std::endl;
}

/// @brief world slot.
void world(int x)
{
  std::cout << "world with " << x << " from thread " << 
               boost::this_thread::get_id() << std::endl;
}

/// @brief Type for signals.
typedef boost::signals2::signal<void (int)> signal_type;

void emit_then_notify_gui(signal_type& signal, unsigned int x);

/// @brief Emit signals then message worker.
void emit_then_notify_worker(signal_type& signal, unsigned int x)
{
  // Emit signal, causing registered slots to run within this thread.
  signal(x);

  // If x has been exhausted, then cause gui service to run out of work.
  if (!x)
  {
    gui_work = boost::none;
  }
  // Otherwise, post work into worker service.
  else
  {
    std::cout << "GUI thread: " << boost::this_thread::get_id() << 
                 " scheduling other thread to emit signals" << std::endl;
    worker_service.post(boost::bind(
        &emit_then_notify_gui,
        boost::ref(signal), --x));
  }  
}

/// @brief Emit signals then message worker.
void emit_then_notify_gui(signal_type& signal, unsigned int x)
{
  // Emit signal, causing registered slots to run within this thread.
  signal(x);

  // If x has been exhausted, then cause gui service to run out of work.
  if (!x)
  {
    gui_work = boost::none;
  }
  // Otherwise, post more work into gui.
  else
  {
    std::cout << "Worker thread: " << boost::this_thread::get_id() << 
                 " scheduling other thread to emit signals" << std::endl;
    gui_service.post(boost::bind(
        &emit_then_notify_worker,
        boost::ref(signal), --x));
  }  
}

void worker_main()
{
  std::cout << "Worker thread: " << boost::this_thread::get_id() << std::endl;
  worker_service.run();
}

int main()
{
  signal_type signal;

  // Connect slots to signal.
  signal.connect(&hello);
  signal.connect(&world);

  boost::optional<boost::asio::io_service::work> worker_work(
     boost::ref(worker_service));
  gui_work = boost::in_place(boost::ref(gui_service));

  std::cout << "GUI thread: " << boost::this_thread::get_id() << std::endl;

  // Spawn off worker thread.
  boost::thread worker_thread(&worker_main);

  // Add work to worker.
  worker_service.post(boost::bind(
      &emit_then_notify_gui,
      boost::ref(signal), 3));

  // Mocked up GUI main loop.
  while (!gui_service.stopped())
  {
    // Do other GUI actions.

    // Perform message processing.
    gui_service.poll_one();
  }

  // Cleanup.
  worker_work = boost::none;
  worker_thread.join();
}
Run Code Online (Sandbox Code Playgroud)

它的输出:

GUI thread: b7f2f6d0
Worker thread: b7f2eb90
hello with 3 from thread b7f2eb90
world with 3 from thread b7f2eb90
Worker thread: b7f2eb90 scheduling other thread to emit signals
hello with 2 from thread b7f2f6d0
world with 2 from thread b7f2f6d0
GUI thread: b7f2f6d0 scheduling other thread to emit signals
hello with 1 from thread b7f2eb90
world with 1 from thread b7f2eb90
Worker thread: b7f2eb90 scheduling other thread to emit signals
hello with 0 from thread b7f2f6d0
world with 0 from thread b7f2f6d0

  • @Enrico:`io_service`是线程安全的,因此不需要`strand`.但是,`strand`会提供相同的结果和一些额外的开销.`strand`维护一个[处理程序队列](http://stackoverflow.com/a/14320871/1053968),一次只将其一个处理程序发布到其`io_service`中[通过`dispatch()`]( http://stackoverflow.com/a/14414809/1053968).在上面的例子中,`dispatch()`将委托给`post()`.有关股线的详细信息以及何时需要,可能值得阅读[this](http://stackoverflow.com/a/12801042/1053968)答案. (2认同)