boost :: asio :: socket线程安全

rav*_*int 16 c++ boost boost-asio

(这是我原始问题的简化版)

我有几个线程写入boost asio socket.这看起来效果很好,没有任何问题.

文档说共享套接字不是线程安全的(这里,在底部向下)所以我想知道我是否应该使用互斥锁等保护套接字.

这个问题坚持认为保护是必要的,但没有就如何保护提出建议.

我原来问题的所有答案也坚持我做的很危险,并且大多数人都敦促我用async_writes或更复杂的东西替换我的写作.但是,我不愿意这样做,因为它会使已经正常工作的代码变得复杂,并且没有一个回答者说服他们知道他们所说的内容 - 他们似乎已经阅读了与我相同的文档并且正在猜测,就像我一样是.

所以,我编写了一个简单的程序来强调测试从两个线程写入共享套接字.

这是服务器,它只是写出从客户端收到的任何内容

int main()
{
    boost::asio::io_service io_service;

    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3001));

    tcp::socket socket(io_service);
    acceptor.accept(socket);

    for (;;)
    {
        char mybuffer[1256];
        int len = socket.read_some(boost::asio::buffer(mybuffer,1256));
        mybuffer[len] = '\0';
        std::cout << mybuffer;
        std::cout.flush();

    }

  return 0;
}
Run Code Online (Sandbox Code Playgroud)

这是客户端,它创建两个线程,尽可能快地写入共享套接字

boost::asio::ip::tcp::socket * psocket;

void speaker1()
{
    string msg("speaker1: hello, server, how are you running?\n");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}
void speaker2()
{
    string msg("speaker2: hello, server, how are you running?\n");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}

int main(int argc, char* argv[])
{

    boost::asio::io_service io_service;

  // connect to server

    tcp::resolver resolver(io_service);
    tcp::resolver::query query("localhost", "3001");
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    tcp::resolver::iterator end;
    psocket = new tcp::socket(io_service);
    boost::system::error_code error = boost::asio::error::host_not_found;
    while (error && endpoint_iterator != end)
    {
        psocket->close();
        psocket->connect(*endpoint_iterator++, error);
    }


    boost::thread t1( speaker1 );
    boost::thread t2( speaker2 );

    Sleep(50000);

}
Run Code Online (Sandbox Code Playgroud)

这有效!完全可以,据我所知.客户端不会崩溃.消息到达服务器时没有乱码.它们通常交替到达,每个线程一个.有时一个线程会在另一个线程之前收到两到三条消息,但只要没有乱码并且所有消息都到达,我认为这不是问题.

我的结论:在一些理论意义上,套接字可能不是线程安全的,但它很难让它失败,我不会担心它.

Sam*_*ler 7

使用boost::asio::io_service::strand非线程安全的异步处理程序.

strand被定义为事件处理程序的严格顺序调用(即没有并发调用).使用strands允许在多线程程序中执行代码,而无需显式锁定(例如使用互斥锁).

计时器教程可能是环绕股头部最简单的方法.


Cli*_*max 6

在重新编写async_write的代码之后,我现在确信任何写操作都是线程安全的,当且仅当数据包大小小于

default_max_transfer_size = 65536;
Run Code Online (Sandbox Code Playgroud)

会发生的是,只要调用async_write,就会在同一个线程中调用async_write_some.调用某种形式的io_service :: run的池中的任何线程将继续为该写操作调用async_write_some,直到它完成.

如果必须多次调用(数据包大于65536),这些async_write_some调用可以并将进行交错.

ASIO不像你期望的那样将写入队列排队到一个套接字,一个接着完成.为了确保线程交错安全写入,请考虑以下代码:

    void my_connection::async_serialized_write(
            boost::shared_ptr<transmission> outpacket) {
        m_tx_mutex.lock();
        bool in_progress = !m_pending_transmissions.empty();
        m_pending_transmissions.push(outpacket);
        if (!in_progress) {
            if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
                boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                        boost::asio::transfer_all(),
            boost::bind(&my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                                       boost::asio::placeholders::bytes_transferred));
            } else { // Send single buffer
                boost::asio::async_write(m_socket,
                                    boost::asio::buffer(
                                           m_pending_transmissions.front()->buffer_references.front(),                          m_pending_transmissions.front()->num_bytes_left),
                boost::asio::transfer_all(),
                boost::bind(
                        &my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
            }
        }
        m_tx_mutex.unlock();
    }

    void my_connection::handle_async_serialized_write(
    const boost::system::error_code& e, size_t bytes_transferred) {
        if (!e) {
            boost::shared_ptr<transmission> transmission;
            m_tx_mutex.lock();
            transmission = m_pending_transmissions.front();
            m_pending_transmissions.pop();
            if (!m_pending_transmissions.empty()) {
                if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
            boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                    boost::asio::transfer_exactly(
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::bind(
                            &chreosis_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                } else { // Send single buffer
                    boost::asio::async_write(m_socket,
                    boost::asio::buffer(
                            m_pending_transmissions.front()->buffer_references.front(),
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::asio::transfer_all(),
                    boost::bind(
                            &my_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                }
            }
            m_tx_mutex.unlock();
            transmission->handler(e, bytes_transferred, transmission);
        } else {
            MYLOG_ERROR(
            m_connection_oid.toString() << " " << "handle_async_serialized_write: " << e.message());
            stop(connection_stop_reasons::stop_async_handler_error);
        }
    }
Run Code Online (Sandbox Code Playgroud)

这基本上构成了一次发送一个数据包的队列.只有在第一次写入成功后才调用async_write,然后调用第一次写入的原始处理程序.

如果asio每个套接字/流自动写入队列,那会更容易.


Sea*_*ean 5

理解ASIO的关键是要意识到完成处理程序只能在调用的线程的上下文中运行,io_service.run()无论哪个线程调用异步方法.如果您只调用io_service.run()了一个线程,那么所有完成处理程序将在该线程的上下文中串行执行.如果您调用io_service.run()了多个线程,那么完成处理程序将在其中一个线程的上下文中执行.您可以将此视为一个线程池,其中池中的线程是调用io_service.run()同一io_service对象的线程.

如果你有多个线程调用,io_service.run()那么你可以强制完成处理程序通过将它们放入一个序列化strand.

要回答问题的最后部分,您应该致电boost::async_write().这将把写操作分派到已调用的线程上,io_service.run()并在写完成后调用完成处理程序.如果你需要序列化这个操作,那么它有点复杂,你应该阅读这里的文档.


Arv*_*vid 5

听起来这个问题归结为:

async_write_some()从两个不同的线程在单个套接字上同时调用时会发生什么

我相信这正是非线程安全的操作.这些缓冲区将在线路上发出的顺序是未定义的,甚至可能是交错的.特别是如果你使用便利功能async_write(),因为它被实现为对async_write_some()下面的一系列调用,直到整个缓冲区被发送.在这种情况下,从两个线程发送的每个片段可以随机交织.

保护您免受这种情况影响的唯一方法是构建您的程序以避免这种情况.

一种方法是编写一个应用程序层发送缓冲区,单个线程负责将其推送到套接字上.这样你就可以只保护发送缓冲区本身.请记住,虽然简单std::vector不起作用,因为在最后添加字节可能最终重新分配它,可能在有一个未完成的async_write_some()引用它时.相反,使用缓冲区的链接列表并使用asio的分散/聚集功能可能是个好主意.