asio :: async_write在大容量流上难以同步

use*_*001 5 c++ multithreading mutex deadlock condition-variable

我目前正在使用Asio C ++库,并围绕它编写了一个客户端包装。我最初的方法非常基础,只需要向一个方向传送即可。需求已更改,我已切换为使用所有异步调用。除了以外,大多数迁移都很容易asio::async_write(...)。我使用了几种不同的方法,每种方法不可避免地陷入僵局。

该应用程序连续大量传输数据。我没有使用多线程,因为它们不会阻塞,并且可能导致内存问题,尤其是在服务器负载沉重时。作业将备份,并且应用程序堆会无限期地增长。

因此,我创建了一个阻塞队列,只是为了找出在回调之间使用锁或阻塞事件导致未知行为的困难方法。

包装器是一个非常大的类,因此,我将尝试解释我的现状,并希望能得到一些好的建议:

  • 我有一个asio::steady_timer按固定时间表运行的设备,可心跳消息直接入阻塞队列。
  • 专门用于读取事件并将其送到阻塞队列的线程
  • 专用于一个线程消耗的阻塞队列的

例如,在队列中,我有一个queue::block()queue::unblock(),它们只是条件变量/互斥锁的包装。

std::thread consumer([this]() {
    std::string message_buffer;

    while (queue.pop(message_buffer)) {
        queue.stage_block();
        asio::async_write(*socket, asio::buffer(message_buffer), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
        queue.block();

    }
});

void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
    queue.unblock();
}
Run Code Online (Sandbox Code Playgroud)

当套接字备份并且服务器由于当前负载而无法再接受数据时,队列将填满并导致死锁,handle_write(...)而永不调用该死锁。

另一种方法完全消除了使用者线程,并依赖于handle_write(...)弹出队列。像这样:

void networking::write(const std::string& data) {
    if (!queue.closed()) {
        std::stringstream stream_buffer;
        stream_buffer << data << std::endl;

        spdlog::get("console")->debug("pushing to queue {}", queue.size());

        queue.push(stream_buffer.str());

        if (queue.size() == 1) {
            spdlog::get("console")->debug("handle_write: {}", stream_buffer.str());
            asio::async_write(*socket, asio::buffer(stream_buffer.str()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
        }
    }
}

void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
    std::string message;
    queue.pop(message);

    if (!queue.closed() && !queue.empty()) {
        std::string front = queue.front();

        asio::async_write(*socket, asio::buffer(queue.front()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
    }
}
Run Code Online (Sandbox Code Playgroud)

这也导致了僵局,并且显然导致了其他种族问题。当我禁用心跳回调时,我绝对没有问题。但是,心跳是必需的。

我究竟做错了什么?有什么更好的方法?

use*_*001 1

看来我所有的痛苦完全来自于心跳。禁用异步写入操作的每个变体中的心跳似乎可以解决我的问题,因此这使我相信这可能是使用内置asio::async_wait(...)asio::steady_timer.

Asio 在内部同步其工作并等待作业完成后再执行下一个作业。使用asio::async_wait(...)来构建我的心跳功能是我的设计缺陷,因为它在等待挂起作业的同一线程上运行。当心跳等待时,它与 Asio 形成了僵局queue::push(...)。这可以解释为什么asio::async_write(...)完成处理程序在我的第一个示例中从未执行。

解决方案是将心跳放在自己的线程上,让它独立于Asio工作。我仍在使用阻塞队列来同步调用,asio::async_write(...)但已修改我的消费者线程以使用std::futurestd::promise。这将回调与我的消费者线程干净地同步。

std::thread networking::heartbeat_worker() {
    return std::thread([&]() {
        while (socket_opened) {

            spdlog::get("console")->trace("heartbeat pending");
            write(heartbeat_message);
            spdlog::get("console")->trace("heartbeat sent");

            std::unique_lock<std::mutex> lock(mutex);
            socket_closed_event.wait_for(lock, std::chrono::milliseconds(heartbeat_interval), [&]() {
                return !socket_opened;
            });
        }

        spdlog::get("console")->trace("heartbeat thread exited gracefully");
    });
}
Run Code Online (Sandbox Code Playgroud)