我想使用一个用于GUI的线程和一个用于某个套接字IO的工作线程来实现Boost Asio模式.
工作线程将用于boost::asio::io_service管理套接字客户端.套接字上的所有操作仅由工作线程执行.
GUI线程需要从工作线程发送和接收消息.
我无法确定如何使用Boost Asio实现此模式.
我已经用标准的Asio方式实现了套接字通信(我io_service.run()从工作线程调用并使用async_read_some/ async_send).我不需要strands因为io_service.run()只是从工作线程调用.
现在我正在尝试添加跨线程消息队列.我该如何实施呢?
我应该run在io_service从GUI线程呢?
或者我应该使用strandswith post将消息从GUI线程发布到工作线程(不调用io_service.run()或io_service.poll_one()从GUI线程),并使用操作系统的GUI消息循环将消息从工作线程发布到GUI线程?
如果我还需要调用io_service.run()或io_service.poll_one()从GUI线程调用,我是否需要strands在套接字操作上使用,因为它io_service是在两个线程之间共享的?
编辑:为了澄清我的问题,我想尽我所能,使用Boost Asio来实现消息队列,只有当Boost Asio无法完成工作时才依赖其他库.
消息传递相当通用.有多种方法可以解决问题,解决方案可能取决于所需的行为细节.例如,阻塞或非阻塞,控制内存分配,上下文等.
Boost.Lockfree为单/多消费者/生产者提供线程安全的无锁无阻塞队列.它倾向于非常适合于事件循环,在这种情况下,消费者被阻塞是不理想的,等待生产者发出同步结构的信号.
boost::lockfree::queue<message_type> worker_message_queue;
void send_worker_message(const message_type& message)
{
// Add message to worker message queue.
worker_message_queue.push(message);
// Add work to worker_io_service that will process the queue.
worker_io_service.post(&process_message);
}
void process_message()
{
message_type message;
// If the message was not retrieved, then return early.
if (!worker_message_queue.pop(message)) return;
...
}
Run Code Online (Sandbox Code Playgroud)可替代地,Boost.Asio的的io_service可以作为一个队列的作用.消息只需要绑定到指定的处理程序.
void send_worker_message(const message_type& message)
{
// Add work to worker_io_service that will process the message.
worker_io_service.post(boost::bind(&process_message, message));
}
void process_message(message_type& message)
{
...
}
Run Code Online (Sandbox Code Playgroud)这条评论表明,欲望不仅仅是传递信息.听起来好像最终目标是允许一个线程使另一个线程调用任意函数.
如果是这种情况,那么考虑:
io_service来设置信号发射.如果GUI线程和工作线程都有自己的io_service,那么工作线程可以将处理程序发布到io_service将发出信号的GUI线程中.在GUI线程的主循环中,它将轮询io_service,发出信号,并从GUI线程的上下文中调用槽.这是完整的示例,其中两个线程将消息(作为unsigned int)传递给另一个,以及导致在另一个线程内调用任意函数.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/signals2.hpp>
#include <boost/thread.hpp>
/// @brief io_service dedicated to gui.
boost::asio::io_service gui_service;
/// @brief io_service dedicated to worker.
boost::asio::io_service worker_service;
/// @brief work to keep gui_service from stopping prematurely.
boost::optional<boost::asio::io_service::work> gui_work;
/// @brief hello slot.
void hello(int x)
{
std::cout << "hello with " << x << " from thread " <<
boost::this_thread::get_id() << std::endl;
}
/// @brief world slot.
void world(int x)
{
std::cout << "world with " << x << " from thread " <<
boost::this_thread::get_id() << std::endl;
}
/// @brief Type for signals.
typedef boost::signals2::signal<void (int)> signal_type;
void emit_then_notify_gui(signal_type& signal, unsigned int x);
/// @brief Emit signals then message worker.
void emit_then_notify_worker(signal_type& signal, unsigned int x)
{
// Emit signal, causing registered slots to run within this thread.
signal(x);
// If x has been exhausted, then cause gui service to run out of work.
if (!x)
{
gui_work = boost::none;
}
// Otherwise, post work into worker service.
else
{
std::cout << "GUI thread: " << boost::this_thread::get_id() <<
" scheduling other thread to emit signals" << std::endl;
worker_service.post(boost::bind(
&emit_then_notify_gui,
boost::ref(signal), --x));
}
}
/// @brief Emit signals then message worker.
void emit_then_notify_gui(signal_type& signal, unsigned int x)
{
// Emit signal, causing registered slots to run within this thread.
signal(x);
// If x has been exhausted, then cause gui service to run out of work.
if (!x)
{
gui_work = boost::none;
}
// Otherwise, post more work into gui.
else
{
std::cout << "Worker thread: " << boost::this_thread::get_id() <<
" scheduling other thread to emit signals" << std::endl;
gui_service.post(boost::bind(
&emit_then_notify_worker,
boost::ref(signal), --x));
}
}
void worker_main()
{
std::cout << "Worker thread: " << boost::this_thread::get_id() << std::endl;
worker_service.run();
}
int main()
{
signal_type signal;
// Connect slots to signal.
signal.connect(&hello);
signal.connect(&world);
boost::optional<boost::asio::io_service::work> worker_work(
boost::ref(worker_service));
gui_work = boost::in_place(boost::ref(gui_service));
std::cout << "GUI thread: " << boost::this_thread::get_id() << std::endl;
// Spawn off worker thread.
boost::thread worker_thread(&worker_main);
// Add work to worker.
worker_service.post(boost::bind(
&emit_then_notify_gui,
boost::ref(signal), 3));
// Mocked up GUI main loop.
while (!gui_service.stopped())
{
// Do other GUI actions.
// Perform message processing.
gui_service.poll_one();
}
// Cleanup.
worker_work = boost::none;
worker_thread.join();
}
Run Code Online (Sandbox Code Playgroud)
它的输出:
GUI thread: b7f2f6d0 Worker thread: b7f2eb90 hello with 3 from thread b7f2eb90 world with 3 from thread b7f2eb90 Worker thread: b7f2eb90 scheduling other thread to emit signals hello with 2 from thread b7f2f6d0 world with 2 from thread b7f2f6d0 GUI thread: b7f2f6d0 scheduling other thread to emit signals hello with 1 from thread b7f2eb90 world with 1 from thread b7f2eb90 Worker thread: b7f2eb90 scheduling other thread to emit signals hello with 0 from thread b7f2f6d0 world with 0 from thread b7f2f6d0