Boost::beast:多个 async_write 调用引发断言错误

The*_*ist 5 c++ boost boost-asio websocket boost-beast

我正在为我的全双工服务器编写测试,当我执行多个(连续)调用(尽管被一条线覆盖)时,我从文件中async_write收到以下断言错误:boost::beastboost/beast/websocket/detail/stream_base.hpp

// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);
Run Code Online (Sandbox Code Playgroud)

要在您的计算机上重现该问题:可以在此处找到重现此问题 (MCVE) 的完整客户端代码。它在链接中不起作用,因为您需要一台服务器(在您自己的机器上,抱歉,因为不可能在线方便地执行此操作,这更好地客观地表明问题出在客户端,而不是服务器,如果我把它包括在这里)。我使用websocketd使用命令创建一个服务器./websocketd --ssl --sslkey /path/to/server.key --sslcert /path/to/server.crt --port=8085 ./prog.py,其中./prog.pyis 一个简单的 python 程序,用于打印和刷新(我从websocketd 主页获取它)。

在客户端中进行写入的调用如下所示:

  std::vector<std::vector<std::future<void>>> clients_write_futures(
      clients_count);
  for (int i = 0; i < clients_count; i++) {
    clients_write_futures[i] = std::vector<std::future<void>>(num_of_messages);
    for (int j = 0; j < num_of_messages; j++) {
      clients_write_futures[i][j] =
          clients[i]->write_data_async_future("Hello"); // writing here
    }
  }
Run Code Online (Sandbox Code Playgroud)

请注意,我在示例中仅使用 1 个客户端。客户端数组只是测试时对服务器造成更大压力的概括。

我对这个问题的评论:

  1. 循环是顺序的;这不像我在多个线程中执行此操作
  2. 应该可以以全双工形式进行通信,其中将不定数量的消息发送到服务器。还可以如何完成全双工通信?
  3. 我使用链来包装我的异步调用,以防止通过 io_service/io_context 发生套接字中的任何冲突
  4. 使用调试器对此进行调查表明,循环的第二次迭代始终失败,这意味着我正在做一些根本错误的事情,但我不知道它是什么。换句话说:这显然是一个确定性问题。

我在这里做错了什么?如何向我的 websocket 服务器写入无限数量的消息?


编辑:

Sehe,我想首先对代码混乱表示歉意(没有意识到它有那么糟糕),并感谢您为此付出的努力。我希望你问我为什么它以这种(可能)有组织且混乱的方式同时构建,答案很简单:主要是一个 gtest 代码,用于查看我的通用、多功能 websocket 客户端是否可以工作,我用它来强调 -测试我的服务器(它使用大量的多线程 io_service 对象,我认为这是敏感的并且需要广泛的测试)。我计划在实际生产测试期间同时用许多客户端轰炸我的服务器。我发布这个问题是因为我不理解客户的行为。我在这个文件中所做的是创建一个 MCVE(人们一直要求这样做)。我花了两个小时来剥离我的代码来创建它,最终我复制了我的 gtest 测试夹具代码(这是服务器上的夹具)并将其粘贴到主服务器中,并验证了问题仍然存在于另一台服务器上并清理了一点(显然这还不够)。

那么为什么我没有捕获异常呢?因为 gtest 会捕获它们并认为测试失败。主要不是生产代码,而是客户端。我从你提到的内容中学到了很多东西,我不得不说抛出和捕获是愚蠢的,但我不知道 std::make_exception_ptr(),所以我找到了我的(哑)方法来实现相同的结果:- )。为什么有太多无用的函数:在这个测试/示例中它们是无用的,但通常我可以稍后将它们用于其他用途,因为这个客户端不仅适用于这种情况。

现在回到问题:我不明白的是,async_write当它在主线程的循环中顺序使用时,为什么我们必须用strand_覆盖(我错误地表达了我只覆盖了处理程序)。我明白为什么处理程序被覆盖,因为套接字不是线程安全的,多线程io_service会在那里产生竞争。我们还知道io_service::post它本身是线程安全的(这就是为什么我认为不需要包装 async_write )。您能否解释一下,在执行此操作时我们需要包装 async_write 本身,这是什么不是线程安全的?我知道您已经知道这一点,但同样的断言仍在触发。我们对处理程序和异步队列进行了排序,但客户端仍然不满意进行多个写入调用。还可以缺少什么?

(顺便说一句,如果你写,然后得到未来,然后读,然后再写,它有效。这就是我使用 future 的原因,来准确定义测试用例并定义测试的时间顺序。我很偏执这里。)

seh*_*ehe 3

async_write用一根线遮住你的。但你没有做这样的事。您可以看到所做的就是将完成处理程序包装在该链中。但是您直接发布异步操作。

更糟糕的是,您是从主线程执行此操作,而与您的实例关联的任何线程上正在进行异步操作WSClient,这意味着您正在同时访问非线程安全的对象实例。

这是一场数据竞争,所以你会得到Undefined Behaviour

一个天真的修复可能是:

std::future<void> write_data_async_future(const std::string &data) {
    // shared_ptr is used to ensure data's survival
    std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
    std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();

    post(strand_, [=,self=shared_from_this()] {
        websock.async_write(
            boost::asio::buffer(*data_ptr),
            boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, self,
                                                          std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                          write_promise)));
    });

    return write_promise->get_future();
}
Run Code Online (Sandbox Code Playgroud)

但这还不够。现在,您可以确定您的异步操作或其完成都不会同时运行,但您仍然可以在调用第一个异步操作的完成处理程序之前发布下一个异步操作。

要解决这个问题,您只需要排队即可。

老实说,我不确定您为什么如此关注使用 future 的同步。这使得实现这一目标变得非常困难。如果您可以描述您/功能上/想要实现的目标,我可以提出一个可能要短得多的解决方案。

代码审查注释

在我明白代码的含义之前,我花了很多时间阅读您的代码。我不想抢走我一路上做的笔记。

警告:这是一次相当漫长的代码潜水。我提供它是因为其中一些见解可能会帮助您了解需要如何重组代码。

我开始阅读异步代码链,直到on_handshake(这设置了started_promise)。

然后我走向了属于你的漩涡main职责。你的main函数就50行代码?!有几个并行容器并通过它们重复手动嵌套循环?

这是我经过一些重构后得到的:

int main() {
    std::vector<actor> actors(1);

    for (auto& a : actors) {
        a.client = std::make_shared<WSClient>();
        a.session_start_future = a.client->start("127.0.0.1", "8085");
        a.messages.resize(50);
    }

    for (auto& a : actors) { a.session_start_future.get(); }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.write_future = a.client->write_data_async_future("Hello");
    } }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.read_future = a.client->read_data_async_future();
    } }

    for (auto& a : actors) { for (auto& m : a.messages) {
        m.write_future.get();
        std::string result = m.read_future.get();
    } }
}
Run Code Online (Sandbox Code Playgroud)

所有数据结构都已折叠到小助手中actor

struct actor {
    std::shared_ptr<WSClient> client;
    std::future<void> session_start_future;

    struct message {
        std::string message = GenerateRandomString(20);
        std::future<void> write_future;
        std::future<std::string> read_future;
    };

    std::vector<message> messages;
};
Run Code Online (Sandbox Code Playgroud)

我们现在已经进行了大约一小时的代码审查,没有任何收获,除了我们现在可以告诉我们什么main做什么,并且有信心循环变量或其他东西不存在一些微不足道的错误。

拾起备份

写在开头:write_data_async_future. 等待。还有write_data_asyncwrite_data_sync。为什么?你会想读

更糟糕的是,WSClient仅将这些转发到假定的单个会话。WSClient为什么和之间有区别WSClientSession根本我说,没有。

进一步删除 30 行不太有用的代码,我们仍然遇到同样的失败,所以这很好。

我们刚刚说到哪了。write_data_async_future。哦,是的,我们需要非未来版本吗?不。所以,又少了 40 行代码。

现在,真正的write_data_async_future::

std::future<void> write_data_async_future(const std::string &data) {
    // shared_ptr is used to ensure data's survival
    std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
    std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
    websock.async_write(
        boost::asio::buffer(*data_ptr),
        boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, shared_from_this(),
                                                      std::placeholders::_1, std::placeholders::_2, data_ptr,
                                                      write_promise)));
    return write_promise->get_future();
}
Run Code Online (Sandbox Code Playgroud)

看起来……还不错。等等,有吗on_write_future?这可能意味着我们需要消除更多未使用的代码行。看着... 是的。噗,不见了。

到目前为止,diffstat 看起来像这样:

  test.cpp | 683 +++++++++++++++++++++++----------------------------------------
  1 file changed, 249 insertions(+), 434 deletions(-)
Run Code Online (Sandbox Code Playgroud)

回到该函数,让我们看一下on_write_future

void on_write_future(boost::system::error_code ec, std::size_t bytes_transferred,
                     std::shared_ptr<std::string> data_posted,
                     std::shared_ptr<std::promise<void> > write_promise) {
    boost::ignore_unused(bytes_transferred);
    boost::ignore_unused(data_posted);

    if (ec) {
        try {
            throw std::runtime_error("Error thrown while performing async write: " + ec.message());
        } catch (...) {
            write_promise->set_exception(std::current_exception());
        }
        return;
    }
    write_promise->set_value();
}
Run Code Online (Sandbox Code Playgroud)

有几个问题。所有过去的事情都被忽略了。我知道您传递共享指针的目的,但也许您应该将它们作为操作对象的一部分传递,以避免拥有这么多单独的共享指针。

抛出异常只是为了捕获它?嗯。对此我不确定。也许只是设置一个新的例外:

if (ec) {
    write_promise->set_exception(
            std::make_exception_ptr(std::system_error(ec, "async write failed")));
} else {
    write_promise->set_value();
}
Run Code Online (Sandbox Code Playgroud)

即便如此,现在仍然存在一个概念问题。get()您随意使用而不会陷入困境的方式main意味着任何连接中的任何错误都将中止所有操作。如果出现错误,只需中止一个连接/会话/客户端,这将非常有用。在您的代码中,它们都是非常同义词的(也与io_contextand thread)。

旁注:您将线程存储为成员,但始终将其分离。这意味着该会员从此就没用了。

此时我暂停了复习,碰巧我的灵机一动向我展示了这个问题。我的练习的半生不熟的结果就在这里。请注意,您不能使用它,因为它实际上并不能解决问题。但它可能在其他方面有帮助吗?