服务器在发送大块数据时被中断而崩溃

Ayd*_*can 3 c++ sockets boost boost-asio

当我优雅地关闭连接到它的客户端时,我的服务器崩溃了,而客户端正在接收大量数据。我正在考虑一个可能的终生错误,就像 boost ASIO 中的大多数错误一样,但是我自己无法指出我的错误。

每个客户端与服务器建立 2 个连接,其中一个用于同步,另一个连接是长期连接,用于接收持续更新。在“同步阶段”,客户端接收大量数据以与服务器状态同步(“状态”基本上是 JSON 格式的数据库数据)。同步完成后,同步连接将关闭。客户端通过其他连接接收数据库的更新(与“同步数据”相比,这些数据当然是非常小的数据)。

这些是相关文件:

连接.h

#pragma once

#include <array>
#include <memory>
#include <string>
#include <boost/asio.hpp>

class ConnectionManager;

/// Represents a single connection from a client.
class Connection : public std::enable_shared_from_this<Connection>
{
public:
  Connection(const Connection&) = delete;
  Connection& operator=(const Connection&) = delete;

  /// Construct a connection with the given socket.
  explicit Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager);

  /// Start the first asynchronous operation for the connection.
  void start();

  /// Stop all asynchronous operations associated with the connection.
  void stop();

  /// Perform an asynchronous write operation.
  void do_write(const std::string& buffer);

  int getNativeHandle();

  ~Connection();

private:
  /// Perform an asynchronous read operation.
  void do_read();

  /// Socket for the connection.
  boost::asio::ip::tcp::socket socket_;

  /// The manager for this connection.
  ConnectionManager& connection_manager_;

  /// Buffer for incoming data.
  std::array<char, 8192> buffer_;

  std::string outgoing_buffer_;
};

typedef std::shared_ptr<Connection> connection_ptr;
Run Code Online (Sandbox Code Playgroud)

连接.cpp

#include "connection.h"

#include <utility>
#include <vector>
#include <iostream>
#include <thread>

#include "connection_manager.h"

Connection::Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager)
    : socket_(std::move(socket))
    , connection_manager_(manager)
{
}

void Connection::start()
{
  do_read();
}

void Connection::stop()
{
  socket_.close();
}

Connection::~Connection()
{
}

void Connection::do_read()
{
  auto self(shared_from_this());
  socket_.async_read_some(boost::asio::buffer(buffer_), [this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
        if (!ec) {
            std::string buff_str = std::string(buffer_.data(), bytes_transferred);
            const auto& tokenized_buffer = split(buff_str, ' ');
            
            if(!tokenized_buffer.empty() && tokenized_buffer[0] == "sync") {
                /// "syncing connection" sends a specific text
                /// hence I can separate between sycing and long-lived connections here and act accordingly.

                const auto& exec_json_strs = getExecutionJsons();
                const auto& order_json_strs = getOrdersAsJsons();
                const auto& position_json_strs = getPositionsAsJsons();
                const auto& all_json_strs = exec_json_strs + order_json_strs + position_json_strs + createSyncDoneJson();
                
                /// this is potentially a very large data.
                do_write(all_json_strs);
            }

            do_read();
        } else {
          connection_manager_.stop(shared_from_this());
        }
      });
}

void Connection::do_write(const std::string& write_buffer)
{
  outgoing_buffer_ = write_buffer;

  auto self(shared_from_this());
  boost::asio::async_write(socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()), [this, self](boost::system::error_code ec, std::size_t transfer_size) {
        if (!ec) {
           /// everything is fine.
        } else {
           /// what to do here?
           /// server crashes once I get error code 32 (EPIPE) here.
        }
      });
}
Run Code Online (Sandbox Code Playgroud)

连接管理器.h

#pragma once

#include <set>
#include "connection.h"

/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class ConnectionManager
{
public:
  ConnectionManager(const ConnectionManager&) = delete;
  ConnectionManager& operator=(const ConnectionManager&) = delete;

  /// Construct a connection manager.
  ConnectionManager();

  /// Add the specified connection to the manager and start it.
  void start(connection_ptr c);

  /// Stop the specified connection.
  void stop(connection_ptr c);

  /// Stop all connections.
  void stop_all();

  void sendAllConnections(const std::string& buffer);

private:
  /// The managed connections.
  std::set<connection_ptr> connections_;
};
Run Code Online (Sandbox Code Playgroud)

连接管理器.cpp

#include "connection_manager.h"

ConnectionManager::ConnectionManager()
{
}

void ConnectionManager::start(connection_ptr c)
{
  connections_.insert(c);
  c->start();
}

void ConnectionManager::stop(connection_ptr c)
{
    connections_.erase(c);
    c->stop();
}

void ConnectionManager::stop_all()
{
  for (auto c: connections_)
    c->stop();

  connections_.clear();
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void ConnectionManager::sendAllConnections(const std::string& buffer)
{
  for (auto c: connections_)
      c->do_write(buffer);
}
Run Code Online (Sandbox Code Playgroud)

服务器.h

#pragma once

#include <boost/asio.hpp>
#include <string>
#include "connection.h"
#include "connection_manager.h"

class Server
{
public:
  Server(const Server&) = delete;
  Server& operator=(const Server&) = delete;

  /// Construct the server to listen on the specified TCP address and port, and
  /// serve up files from the given directory.
  explicit Server(const std::string& address, const std::string& port);

  /// Run the server's io_service loop.
  void run();

  void deliver(const std::string& buffer);

private:
  /// Perform an asynchronous accept operation.
  void do_accept();

  /// Wait for a request to stop the server.
  void do_await_stop();

  /// The io_service used to perform asynchronous operations.
  boost::asio::io_service io_service_;

  /// The signal_set is used to register for process termination notifications.
  boost::asio::signal_set signals_;

  /// Acceptor used to listen for incoming connections.
  boost::asio::ip::tcp::acceptor acceptor_;

  /// The connection manager which owns all live connections.
  ConnectionManager connection_manager_;

  /// The *NEXT* socket to be accepted.
  boost::asio::ip::tcp::socket socket_;
};
Run Code Online (Sandbox Code Playgroud)

服务器.cpp

#include "server.h"
#include <signal.h>
#include <utility>

Server::Server(const std::string& address, const std::string& port)
    : io_service_()
    , signals_(io_service_)
    , acceptor_(io_service_)
    , connection_manager_()
    , socket_(io_service_)
{
  // Register to handle the signals that indicate when the server should exit.
  // It is safe to register for the same signal multiple times in a program,
  // provided all registration for the specified signal is made through Asio.
  signals_.add(SIGINT);
  signals_.add(SIGTERM);
#if defined(SIGQUIT)
  signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)

  do_await_stop();

  // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
  boost::asio::ip::tcp::resolver resolver(io_service_);
  boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve({address, port});
  acceptor_.open(endpoint.protocol());
  acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
  acceptor_.bind(endpoint);
  acceptor_.listen();

  do_accept();
}

void Server::run()
{
  // The io_service::run() call will block until all asynchronous operations
  // have finished. While the server is running, there is always at least one
  // asynchronous operation outstanding: the asynchronous accept call waiting
  // for new incoming connections.
  io_service_.run();
}

void Server::do_accept()
{
  acceptor_.async_accept(socket_,
      [this](boost::system::error_code ec)
      {
        // Check whether the server was stopped by a signal before this
        // completion handler had a chance to run.
        if (!acceptor_.is_open())
        {
          return;
        }

        if (!ec)
        {
          connection_manager_.start(std::make_shared<Connection>(
              std::move(socket_), connection_manager_));
        }

        do_accept();
      });
}

void Server::do_await_stop()
{
  signals_.async_wait(
      [this](boost::system::error_code /*ec*/, int /*signo*/)
      {
        // The server is stopped by cancelling all outstanding asynchronous
        // operations. Once all operations have finished the io_service::run()
        // call will exit.
        acceptor_.close();
        connection_manager_.stop_all();
      });
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void Server::deliver(const std::string& buffer)
{
    connection_manager_.sendAllConnections(buffer);
}   
Run Code Online (Sandbox Code Playgroud)

所以,我重复我的问题:当我优雅地关闭连接到它的客户端时,我的服务器崩溃了,而客户端正在接收大量数据,我不知道为什么。

编辑:一旦我收到 EPIPE 错误,async_write 函数就会发生崩溃。该应用程序是多线程的。有 4 个线程在生成数据时使用自己的数据调用 Server::deliver。Deliver() 用于使客户端保持最新状态,它与初始同步无关:同步是通过从数据库获取的持久数据完成的。

我有一个 io_service,所以我认为我不需要链。io_service::run 在主线程上调用,因此主线程处于阻塞状态。

seh*_*ehe 7

审查,添加一些缺失的代码位:

\n
namespace /*missing code stubs*/ {\n    auto split(std::string_view input, char delim) {\n        std::vector<std::string_view> result;\n        boost::algorithm::split(result, input,\n                                boost::algorithm::is_from_range(delim, delim));\n        return result;\n    }\n\n    std::string getExecutionJsons()   { return ""; }\n    std::string getOrdersAsJsons()    { return ""; }\n    std::string getPositionsAsJsons() { return ""; }\n    std::string createSyncDoneJson()  { return ""; }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

现在我注意到的事情是:

\n
    \n
  1. 你有一个io_service,所以只有一个线程。好的,所以除非您的其他代码中有线程(例如?),否则不需要任何线程main

    \n
  2. \n
  3. 怀疑线程正在发挥作用的一个特殊原因是没有人可能Server::deliver因为run()阻塞而调用。这意味着每当您deliver()现在调用时,它都会导致数据争用,从而导致未定义的行为

    \n

    随意的评论

    \n
     /// this function is used to keep clients up to date with the changes,\n /// not used during syncing phase.\n
    Run Code Online (Sandbox Code Playgroud)\n

    并没有做太多事情来消除这种担忧。代码需要防止滥用。评论不会被执行。让它变得更好:

    \n
     void Server::deliver(const std::string& buffer) {\n     post(io_context_,\n          [this, buffer] { connection_manager_.broadcast(std::move(buffer)); });\n }\n
    Run Code Online (Sandbox Code Playgroud)\n
  4. \n
  5. 在接受“新”写入之前,您不会检查先前的写入是否已完成。这意味着调用Connection::do_write会导致未定义的行为,原因有两个:

    \n
      \n
    • outgoing_buffer_在使用该缓冲区的正在进行的异步操作期间进行修改是 UB

      \n
    • \n
    • async_write在同一个 IO 对象上有两个重叠的是 UB (请参阅文档

      \n
    • \n
    \n

    解决这个问题的典型方法是使用传出消息队列。

    \n
  6. \n
  7. usingasync_read_some很少是您想要的,特别是因为读取不会累积到动态缓冲区中。这意味着,如果您的数据包在意外边界处分离,您可能根本无法检测到命令,或者错误地检测到命令。

    \n

    相反,请考虑asio::async_read_until使用动态缓冲区(例如

    \n
      \n
    • 直接读入,std::string这样您就不必将缓冲区复制到字符串中
    • \n
    • 读入streambuf,以便您可以使用std::istream(&sbuf_)解析而不是标记化
    • \n
    \n
  8. \n
  9. 连接all_json_strs显然必须拥有文本容器的内容是浪费的。相反,使用 const-buffer-sequence 将它们全部组合起来而不进行复制。

    \n

    更好的是,考虑采用流式方法进行 JSON 序列化,这样就不需要在任何给定时间在内存中序列化所有 JSON。

    \n
  10. \n
  11. 不要声明空析构函数 ( ~Connection)。他们是悲观主义者

    \n
  12. \n
  13. 对于空构造函数 ( ConnectionManager) 也是如此。如果必须的话,请考虑

    \n
    ConnectionManager::ConnectionManager() = default;\n
    Run Code Online (Sandbox Code Playgroud)\n
  14. \n
  15. getNativeHandle给了我更多关于可能干扰的其他代码的问题。例如,它可能表明其他库正在执行操作,这又可能导致重叠读/写,或者它可能是更多代码存在于线程上的标志(正如Server::run()定义阻塞一样)

    \n
  16. \n
  17. 连接管理器可能应该保留weak_ptr,以便Connections 最终终止。现在,最后一个引用根据定义保存在连接管理器中,这意味着当对等方断开连接或会话由于某种其他原因失败时,任何内容都不会被破坏。

    \n
  18. \n
  19. 这不是惯用语:

    \n
    // Check whether the server was stopped by a signal before this\n// completion handler had a chance to run.\nif (!acceptor_.is_open()) {\n    return;\n}\n
    Run Code Online (Sandbox Code Playgroud)\n

    如果您关闭了接受器,error::operation_aborted无论如何都会调用完成处理程序。只需处理这个问题,例如在我稍后发布的最终版本中:

    \n
    // separate strand for each connection - just in case you ever add threads\nacceptor_.async_accept(\n    make_strand(io_context_), [this](error_code ec, tcp::socket sock) {\n        if (!ec) {\n            connection_manager_.register_and_start(\n                std::make_shared<Connection>(std::move(sock),\n                                             connection_manager_));\n            do_accept();\n        }\n    });\n
    Run Code Online (Sandbox Code Playgroud)\n
  20. \n
  21. 我注意到这个评论:

    \n
    // The server is stopped by cancelling all outstanding asynchronous\n// operations. Once all operations have finished the io_service::run()\n// call will exit.\n
    Run Code Online (Sandbox Code Playgroud)\n

    事实上,您从未对cancel()代码中的任何 IO 对象进行任何操作。同样,注释不会被执行。最好确实按照你所说的去做,并让析构函数关闭资源。这可以防止在关闭后使用对象时出现虚假错误,并且还可以防止非常烦人的竞争条件,例如您关闭了句柄,其他一些线程在同一文件描述符上重新打开了一个新流,并且您已将句柄交给了第三个派对(使用getNativeHandle)...你明白这会导致什么结果吗?

    \n
  22. \n
\n

重现问题?

\n

通过这种方式查看后,我尝试重现该问题,因此我创建了虚假数据:

\n
    std::string getExecutionJsons()   { return std::string(1024,  \'E\'); }\n    std::string getOrdersAsJsons()    { return std::string(13312, \'O\'); }\n    std::string getPositionsAsJsons() { return std::string(8192,  \'P\'); }\n    std::string createSyncDoneJson()  { return std::string(24576, \'C\'); }\n
Run Code Online (Sandbox Code Playgroud)\n

对 Connection 类进行一些细微的调整:

\n
    std::string buff_str =\n        std::string(buffer_.data(), bytes_transferred);\n    const auto& tokenized_buffer = split(buff_str, \' \');\n\n    if (!tokenized_buffer.empty() &&\n        tokenized_buffer[0] == "sync") {\n        std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;\n        /// "syncing connection" sends a specific text\n        /// hence I can separate between sycing and long-lived\n        /// connections here and act accordingly.\n\n        const auto& exec_json_strs     = getExecutionJsons();\n        const auto& order_json_strs    = getOrdersAsJsons();\n        const auto& position_json_strs = getPositionsAsJsons();\n        const auto& all_json_strs      = exec_json_strs +\n            order_json_strs + position_json_strs +\n            createSyncDoneJson();\n\n        std::cerr << "All json length: " << all_json_strs.length() << std::endl;\n        /// this is potentially a very large data.\n        do_write(all_json_strs); // already on strand!\n    }\n
Run Code Online (Sandbox Code Playgroud)\n

我们得到服务器输出

\n
sync detected on 127.0.0.1:43012\nAll json length: 47104\nsync detected on 127.0.0.1:43044\nAll json length: 47104\n
Run Code Online (Sandbox Code Playgroud)\n

以及使用 netcat 伪造的客户端:

\n
$ netcat localhost 8989 <<< \'sync me\' > expected\n^C\n$\xc2\xa0wc -c expected \n47104 expected\n
Run Code Online (Sandbox Code Playgroud)\n

好的。现在让我们过早断开连接:

\n
netcat localhost 8989 -w0 <<< \'sync me\' > truncated\n$ wc -c truncated \n0 truncated\n
Run Code Online (Sandbox Code Playgroud)\n

所以,它确实导致提前关闭,但服务器仍然说

\n
sync detected on 127.0.0.1:44176\nAll json length: 47104\n
Run Code Online (Sandbox Code Playgroud)\n

我们do_write也来测试一下:

\n
    async_write( //\n        socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()),\n        [/*this,*/ self](error_code ec, size_t transfer_size) {\n            std::cerr << "do_write completion: " << transfer_size << " bytes ("\n                      << ec.message() << ")" << std::endl;\n\n            if (!ec) {\n                /// everything is fine.\n            } else {\n                /// what to do here?\n                // FIXME: probably cancel the read loop so the connection\n                // closes?\n            }\n        });\n
Run Code Online (Sandbox Code Playgroud)\n

现在我们看到:

\n
sync detected on 127.0.0.1:44494\nAll json length: 47104\ndo_write completion: 47104 bytes (Success)\nsync detected on 127.0.0.1:44512\nAll json length: 47104\ndo_write completion: 32768 bytes (Operation canceled)\n
Run Code Online (Sandbox Code Playgroud)\n

对于一个断开连接和一个“正常”连接。

\n

没有崩溃/未定义行为的迹象。让我们检查一下-fsanitize=address,undefined:干净的记录,甚至添加心跳:

\n
int main() {\n    Server s("127.0.0.1", "8989");\n\n    std::thread yolo([&s] {\n        using namespace std::literals;\n        int i = 1;\n\n        do {\n            std::this_thread::sleep_for(5s);\n        } while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));\n    });\n\n    s.run();\n\n    yolo.join();\n}\n
Run Code Online (Sandbox Code Playgroud)\n

结论

\n

上面强调的唯一没有解决的问题是:

\n
    \n
  • 未显示其他线程问题(可能通过getNativeHandle

    \n
  • \n
  • 事实上,您可以在 Connection 中进行重叠写入do_write。解决这个问题:

    \n
     void Connection::write(std::string msg) { // public, might not be on the strand\n     post(socket_.get_executor(),\n          [self = shared_from_this(), msg = std::move(msg)]() mutable {\n              self->do_write(std::move(msg));\n          });\n }\n\n void Connection::do_write(std::string msg) { // assumed on the strand\n     outgoing_.push_back(std::move(msg));\n\n     if (outgoing_.size() == 1)\n         do_write_loop();\n }\n\n void Connection::do_write_loop() {\n     if (outgoing_.size() == 0)\n         return;\n\n     auto self(shared_from_this());\n     async_write( //\n         socket_, boost::asio::buffer(outgoing_.front()),\n         [this, self](error_code ec, size_t transfer_size) {\n             std::cerr << "write completion: " << transfer_size << " bytes ("\n                       << ec.message() << ")" << std::endl;\n\n             if (!ec) {\n                 outgoing_.pop_front();\n                 do_write_loop();\n             } else {\n                 socket_.cancel();\n\n                 // This would ideally be enough to free the connection, but\n                 // since `ConnectionManager` doesn\'t use `weak_ptr` you need to\n                 // force the issue using kind of an "umbillical cord reflux":\n                 connection_manager_.stop(self);\n             }\n         });\n }\n
    Run Code Online (Sandbox Code Playgroud)\n
  • \n
\n

正如您所看到的,我还拆分了write/do_write以防止链外调用。与 相同stop

\n

完整列表

\n

包含上面所有备注/修复的完整列表:

\n
    \n
  • 文件connection.h

    \n
     #pragma once\n\n #include <boost/asio.hpp>\n\n #include <array>\n #include <deque>\n #include <memory>\n #include <string>\n using boost::asio::ip::tcp;\n\n class ConnectionManager;\n\n /// Represents a single connection from a client.\n class Connection : public std::enable_shared_from_this<Connection> {\n   public:\n     Connection(const Connection&) = delete;\n     Connection& operator=(const Connection&) = delete;\n\n     /// Construct a connection with the given socket.\n     explicit Connection(tcp::socket socket, ConnectionManager& manager);\n\n     void start();\n     void stop();\n     void write(std::string msg);\n\n   private:\n     void do_stop();\n     void do_write(std::string msg);\n     void do_write_loop();\n\n     /// Perform an asynchronous read operation.\n     void do_read();\n\n     /// Socket for the connection.\n     tcp::socket socket_;\n\n     /// The manager for this connection.\n     ConnectionManager& connection_manager_;\n\n     /// Buffer for incoming data.\n     std::array<char, 8192> buffer_;\n\n     std::deque<std::string> outgoing_;\n };\n\n using connection_ptr = std::shared_ptr<Connection>;\n
    Run Code Online (Sandbox Code Playgroud)\n
  • \n
  • 文件connection_manager.h

    \n
     #pragma once\n\n #include <list>\n #include "connection.h"\n\n /// Manages open connections so that they may be cleanly stopped when the server\n /// needs to shut down.\n class ConnectionManager {\n   public:\n     ConnectionManager(const ConnectionManager&) = delete;\n     ConnectionManager& operator=(const ConnectionManager&) = delete;\n     ConnectionManager() = default; // could be split across h/cpp if you wanted\n\n     void register_and_start(connection_ptr c);\n     void stop(connection_ptr c);\n     void stop_all();\n\n     void broadcast(const std::string& buffer);\n\n     // purge defunct connections, returns remaining active connections\n     size_t garbage_collect();\n\n   private:\n     using handle = std::weak_ptr<connection_ptr::element_type>;\n     std::list<handle> connections_;\n };\n
    Run Code Online (Sandbox Code Playgroud)\n
  • \n
  • 文件server.h

    \n
     #pragma once\n\n #include <boost/asio.hpp>\n #include <string>\n #include "connection.h"\n #include "connection_manager.h"\n\n class Server {\n   public:\n     Server(const Server&) = delete;\n     Server& operator=(const Server&) = delete;\n\n     /// Construct the server to listen on the specified TCP address and port,\n     /// and serve up files from the given directory.\n     explicit Server(const std::string& address, const std::string& port);\n\n     /// Run the server\'s io_service loop.\n     void run();\n\n     bool deliver(const std::string& buffer);\n\n   private:\n     void do_accept();\n     void do_await_signal();\n\n     boost::asio::io_context      io_context_;\n     boost::asio::any_io_executor strand_{io_context_.get_executor()};\n     boost::asio::signal_set      signals_{strand_};\n     tcp::acceptor                acceptor_{strand_};\n     ConnectionManager            connection_manager_;\n };\n
    Run Code Online (Sandbox Code Playgroud)\n
  • \n
  • 文件connection.cpp

    \n
     #include "connection.h"\n\n #include <boost/algorithm/string.hpp>\n #include <iostream>\n #include <thread>\n #include <utility>\n #include <vector>\n\n #include "connection_manager.h"\n using boost::system::error_code;\n\n Connection::Connection(tcp::socket socket, ConnectionManager& manager)\n     : socket_(std::move(socket))\n     , connection_manager_(manager) {}\n\n void Connection::start() { // always assumed on the strand (since connection\n                            // just constructed)\n     do_read();\n }\n\n void Connection::stop() { // public, might not be on the strand\n     post(socket_.get_executor(),\n          [self = shared_from_this()]() mutable {\n              self->do_stop();\n          });\n }\n\n void Connection::do_stop() { // assumed on the strand\n     socket_.cancel(); // trust shared pointer to destruct\n }\n\n namespace /*missing code stubs*/ {\n     auto split(std::string_view input, char delim) {\n         std::vector<std::string_view> result;\n         boost::algorithm::split(result, input,\n                                 boost::algorithm::is_from_range(delim, delim));\n         return result;\n     }\n\n     std::string getExecutionJsons()   { return std::string(1024,  \'E\'); }\n     std::string getOrdersAsJsons()    { return std::string(13312, \'O\'); }\n     std::string getPositionsAsJsons() { return std::string(8192,  \'P\'); }\n     std::string createSyncDoneJson()  { return std::string(24576, \'C\'); }\n } // namespace\n\n void Connection::do_read() {\n     auto self(shared_from_this());\n     socket_.async_read_some(\n         boost::asio::buffer(buffer_),\n         [this, self](error_code ec, size_t bytes_transferred) {\n             if (!ec) {\n                 std::string buff_str =\n                     std::string(buffer_.data(), bytes_transferred);\n                 const auto& tokenized_buffer = split(buff_str, \' \');\n\n                 if (!tokenized_buffer.empty() &&\n                     tokenized_buffer[0] == "sync") {\n                     std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;\n                     /// "syncing connection" sends a specific text\n                     /// hence I can separate between sycing and long-lived\n                     /// connections here and act accordingly.\n\n                     const auto& exec_json_strs     = getExecutionJsons();\n                     const auto& order_json_strs    = getOrdersAsJsons();\n                     const auto& position_json_strs = getPositionsAsJsons();\n                     const auto& all_json_strs      = exec_json_strs +\n                         order_json_strs + position_json_strs +\n                         createSyncDoneJson();\n\n                     std::cerr << "All json length: " << all_json_strs.length() << std::endl;\n                     /// this is potentially a very large data.\n                     do_write(all_json_strs); // already on strand!\n                 }\n\n                 do_read();\n             } else {\n                 std::cerr << "do_read terminating: " << ec.message() << std::endl;\n                 connection_manager_.stop(shared_from_this());\n             }\n         });\n }\n\n void Connection::write(std::string msg) { // public, might not be on the strand\n     post(socket_.get_executor(),\n          [self = shared_from_this(), msg = std::move(msg)]() mutable {\n              self->do_write(std::move(msg));\n          });\n }\n\n void Connection::do_write(std::string msg) { // assumed on the strand\n     outgoing_.push_back(std::move(msg));\n\n     if (outgoing_.size() == 1)\n         do_write_loop();\n }\n\n void Connection::do_write_loop() {\n     if (outgoing_.size() == 0)\n         return;\n\n     auto self(shared_from_this());\n     async_write( //\n         socket_, boost::asio::buffer(outgoing_.front()),\n         [this, self](error_code ec, size_t transfer_size) {\n             std::cerr << "write completion: " << transfer_size << " bytes ("\n                       << ec.message() << ")" << std::endl;\n\n             if (!ec) {\n                 outgoing_.pop_front();\n                 do_write_loop();\n             } else {\n                 socket_.cancel();\n\n                 // This would ideally be enough to free the connection, but\n                 // since `ConnectionManager` doesn\'t use `weak_ptr` you need to\n                 // force the issue using kind of an "umbellical cord reflux":\n                 connection_manager_.stop(self);\n             }\n         });\n }\n
    Run Code Online (Sandbox Code Playgroud)\n
  • \n
  • 文件connection_manager.cpp

    \n
     #include "connection_manager.h"\n\n void ConnectionManager::register_and_start(connection_ptr c) {\n     connections_.emplace_back(c);\n     c->start();\n }\n\n void ConnectionManager::stop(connection_ptr c) {\n     c->stop();\n }\n\n void ConnectionManager::stop_all() {\n     for (auto h : connections_)\n         if (auto c = h.lock())\n             c->stop();\n }\n\n /// this function is used to keep clients up to date with the changes, not used\n /// during syncing phase.\n void ConnectionManager::broadcast(const std::string& buffer) {\n     for (auto h : connections_)\n         if (auto c = h.lock())\n             c->write(buffer);\n }\n\n size_t ConnectionManager::garbage_collect() {\n     connections_.remove_if(std::mem_fn(&handle::expired));\n     return connections_.size();\n }\n
    Run Code Online (Sandbox Code Playgroud)\n
  • \n
  • 文件server.cpp

    \n
     #include "server.h"\n #include <signal.h>\n #include <utility>\n\n using boost::system::error_code;\n\n Server::Server(const std::string& address, const std::string& port)\n     : io_context_(1) // THREAD HINT: single threaded\n     , connection_manager_()\n {\n     // Register to handle the signals that indicate when the server should exit.\n     // It is safe to register for the same signal multiple times in a program,\n     // provided all registration for the specified signal is made through Asio.\n     signals_.add(SIGINT);\n     signals_.add(SIGTERM);\n #if defined(SIGQUIT)\n     signals_.add(SIGQUIT);\n #endif // defined(SIGQUIT)\n\n     do_await_signal();\n\n     // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).\n     tcp::resolver resolver(io_context_);\n     tcp::endpoint endpoint = *resolver.resolve({address, port});\n     acceptor_.open(endpoint.protocol());\n     acceptor_.set_option(tcp::acceptor::reuse_address(true));\n     acceptor_.bind(endpoint);\n     acceptor_.listen();\n\n     do_accept();\n }\n\n void Server::run() {\n     // The io_service::run() call will block until all asynchronous operations\n     // have finished. While the server is running, there is always at least one\n     // asynchronous operation outstanding: the asynchronous accept call waiting\n     // for new incoming connections.\n     io_context_.run();\n }\n\n void Server::do_accept() {\n     // separate strand for each connection - just in case you ever add threads\n     acceptor_.async_acce