Ayd*_*can 3 c++ sockets boost boost-asio
当我优雅地关闭连接到它的客户端时,我的服务器崩溃了,而客户端正在接收大量数据。我正在考虑一个可能的终生错误,就像 boost ASIO 中的大多数错误一样,但是我自己无法指出我的错误。
每个客户端与服务器建立 2 个连接,其中一个用于同步,另一个连接是长期连接,用于接收持续更新。在“同步阶段”,客户端接收大量数据以与服务器状态同步(“状态”基本上是 JSON 格式的数据库数据)。同步完成后,同步连接将关闭。客户端通过其他连接接收数据库的更新(与“同步数据”相比,这些数据当然是非常小的数据)。
这些是相关文件:
连接.h
#pragma once
#include <array>
#include <memory>
#include <string>
#include <boost/asio.hpp>
class ConnectionManager;
/// Represents a single connection from a client.
class Connection : public std::enable_shared_from_this<Connection>
{
public:
Connection(const Connection&) = delete;
Connection& operator=(const Connection&) = delete;
/// Construct a connection with the given socket.
explicit Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager);
/// Start the first asynchronous operation for the connection.
void start();
/// Stop all asynchronous operations associated with the connection.
void stop();
/// Perform an asynchronous write operation.
void do_write(const std::string& buffer);
int getNativeHandle();
~Connection();
private:
/// Perform an asynchronous read operation.
void do_read();
/// Socket for the connection.
boost::asio::ip::tcp::socket socket_;
/// The manager for this connection.
ConnectionManager& connection_manager_;
/// Buffer for incoming data.
std::array<char, 8192> buffer_;
std::string outgoing_buffer_;
};
typedef std::shared_ptr<Connection> connection_ptr;
Run Code Online (Sandbox Code Playgroud)
连接.cpp
#include "connection.h"
#include <utility>
#include <vector>
#include <iostream>
#include <thread>
#include "connection_manager.h"
Connection::Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager)
: socket_(std::move(socket))
, connection_manager_(manager)
{
}
void Connection::start()
{
do_read();
}
void Connection::stop()
{
socket_.close();
}
Connection::~Connection()
{
}
void Connection::do_read()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(buffer_), [this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
if (!ec) {
std::string buff_str = std::string(buffer_.data(), bytes_transferred);
const auto& tokenized_buffer = split(buff_str, ' ');
if(!tokenized_buffer.empty() && tokenized_buffer[0] == "sync") {
/// "syncing connection" sends a specific text
/// hence I can separate between sycing and long-lived connections here and act accordingly.
const auto& exec_json_strs = getExecutionJsons();
const auto& order_json_strs = getOrdersAsJsons();
const auto& position_json_strs = getPositionsAsJsons();
const auto& all_json_strs = exec_json_strs + order_json_strs + position_json_strs + createSyncDoneJson();
/// this is potentially a very large data.
do_write(all_json_strs);
}
do_read();
} else {
connection_manager_.stop(shared_from_this());
}
});
}
void Connection::do_write(const std::string& write_buffer)
{
outgoing_buffer_ = write_buffer;
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()), [this, self](boost::system::error_code ec, std::size_t transfer_size) {
if (!ec) {
/// everything is fine.
} else {
/// what to do here?
/// server crashes once I get error code 32 (EPIPE) here.
}
});
}
Run Code Online (Sandbox Code Playgroud)
连接管理器.h
#pragma once
#include <set>
#include "connection.h"
/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class ConnectionManager
{
public:
ConnectionManager(const ConnectionManager&) = delete;
ConnectionManager& operator=(const ConnectionManager&) = delete;
/// Construct a connection manager.
ConnectionManager();
/// Add the specified connection to the manager and start it.
void start(connection_ptr c);
/// Stop the specified connection.
void stop(connection_ptr c);
/// Stop all connections.
void stop_all();
void sendAllConnections(const std::string& buffer);
private:
/// The managed connections.
std::set<connection_ptr> connections_;
};
Run Code Online (Sandbox Code Playgroud)
连接管理器.cpp
#include "connection_manager.h"
ConnectionManager::ConnectionManager()
{
}
void ConnectionManager::start(connection_ptr c)
{
connections_.insert(c);
c->start();
}
void ConnectionManager::stop(connection_ptr c)
{
connections_.erase(c);
c->stop();
}
void ConnectionManager::stop_all()
{
for (auto c: connections_)
c->stop();
connections_.clear();
}
/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void ConnectionManager::sendAllConnections(const std::string& buffer)
{
for (auto c: connections_)
c->do_write(buffer);
}
Run Code Online (Sandbox Code Playgroud)
服务器.h
#pragma once
#include <boost/asio.hpp>
#include <string>
#include "connection.h"
#include "connection_manager.h"
class Server
{
public:
Server(const Server&) = delete;
Server& operator=(const Server&) = delete;
/// Construct the server to listen on the specified TCP address and port, and
/// serve up files from the given directory.
explicit Server(const std::string& address, const std::string& port);
/// Run the server's io_service loop.
void run();
void deliver(const std::string& buffer);
private:
/// Perform an asynchronous accept operation.
void do_accept();
/// Wait for a request to stop the server.
void do_await_stop();
/// The io_service used to perform asynchronous operations.
boost::asio::io_service io_service_;
/// The signal_set is used to register for process termination notifications.
boost::asio::signal_set signals_;
/// Acceptor used to listen for incoming connections.
boost::asio::ip::tcp::acceptor acceptor_;
/// The connection manager which owns all live connections.
ConnectionManager connection_manager_;
/// The *NEXT* socket to be accepted.
boost::asio::ip::tcp::socket socket_;
};
Run Code Online (Sandbox Code Playgroud)
服务器.cpp
#include "server.h"
#include <signal.h>
#include <utility>
Server::Server(const std::string& address, const std::string& port)
: io_service_()
, signals_(io_service_)
, acceptor_(io_service_)
, connection_manager_()
, socket_(io_service_)
{
// Register to handle the signals that indicate when the server should exit.
// It is safe to register for the same signal multiple times in a program,
// provided all registration for the specified signal is made through Asio.
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)
do_await_stop();
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve({address, port});
acceptor_.open(endpoint.protocol());
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
do_accept();
}
void Server::run()
{
// The io_service::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one
// asynchronous operation outstanding: the asynchronous accept call waiting
// for new incoming connections.
io_service_.run();
}
void Server::do_accept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
// Check whether the server was stopped by a signal before this
// completion handler had a chance to run.
if (!acceptor_.is_open())
{
return;
}
if (!ec)
{
connection_manager_.start(std::make_shared<Connection>(
std::move(socket_), connection_manager_));
}
do_accept();
});
}
void Server::do_await_stop()
{
signals_.async_wait(
[this](boost::system::error_code /*ec*/, int /*signo*/)
{
// The server is stopped by cancelling all outstanding asynchronous
// operations. Once all operations have finished the io_service::run()
// call will exit.
acceptor_.close();
connection_manager_.stop_all();
});
}
/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void Server::deliver(const std::string& buffer)
{
connection_manager_.sendAllConnections(buffer);
}
Run Code Online (Sandbox Code Playgroud)
所以,我重复我的问题:当我优雅地关闭连接到它的客户端时,我的服务器崩溃了,而客户端正在接收大量数据,我不知道为什么。
编辑:一旦我收到 EPIPE 错误,async_write 函数就会发生崩溃。该应用程序是多线程的。有 4 个线程在生成数据时使用自己的数据调用 Server::deliver。Deliver() 用于使客户端保持最新状态,它与初始同步无关:同步是通过从数据库获取的持久数据完成的。
我有一个 io_service,所以我认为我不需要链。io_service::run 在主线程上调用,因此主线程处于阻塞状态。
审查,添加一些缺失的代码位:
\nnamespace /*missing code stubs*/ {\n auto split(std::string_view input, char delim) {\n std::vector<std::string_view> result;\n boost::algorithm::split(result, input,\n boost::algorithm::is_from_range(delim, delim));\n return result;\n }\n\n std::string getExecutionJsons() { return ""; }\n std::string getOrdersAsJsons() { return ""; }\n std::string getPositionsAsJsons() { return ""; }\n std::string createSyncDoneJson() { return ""; }\n}\n
Run Code Online (Sandbox Code Playgroud)\n现在我注意到的事情是:
\n你有一个io_service
,所以只有一个线程。好的,所以除非您的其他代码中有线程(例如?),否则不需要任何线程main
。
怀疑线程正在发挥作用的一个特殊原因是没有人可能Server::deliver
因为run()
阻塞而调用。这意味着每当您deliver()
现在调用时,它都会导致数据争用,从而导致未定义的行为
随意的评论
\n /// this function is used to keep clients up to date with the changes,\n /// not used during syncing phase.\n
Run Code Online (Sandbox Code Playgroud)\n并没有做太多事情来消除这种担忧。代码需要防止滥用。评论不会被执行。让它变得更好:
\n void Server::deliver(const std::string& buffer) {\n post(io_context_,\n [this, buffer] { connection_manager_.broadcast(std::move(buffer)); });\n }\n
Run Code Online (Sandbox Code Playgroud)\n在接受“新”写入之前,您不会检查先前的写入是否已完成。这意味着调用Connection::do_write
会导致未定义的行为,原因有两个:
outgoing_buffer_
在使用该缓冲区的正在进行的异步操作期间进行修改是 UB
async_write
在同一个 IO 对象上有两个重叠的是 UB (请参阅文档
解决这个问题的典型方法是使用传出消息队列。
\nusingasync_read_some
很少是您想要的,特别是因为读取不会累积到动态缓冲区中。这意味着,如果您的数据包在意外边界处分离,您可能根本无法检测到命令,或者错误地检测到命令。
相反,请考虑asio::async_read_until
使用动态缓冲区(例如
std::string
这样您就不必将缓冲区复制到字符串中streambuf
,以便您可以使用std::istream(&sbuf_)
解析而不是标记化连接all_json_strs
显然必须拥有文本容器的内容是浪费的。相反,使用 const-buffer-sequence 将它们全部组合起来而不进行复制。
更好的是,考虑采用流式方法进行 JSON 序列化,这样就不需要在任何给定时间在内存中序列化所有 JSON。
\n不要声明空析构函数 ( ~Connection
)。他们是悲观主义者
对于空构造函数 ( ConnectionManager
) 也是如此。如果必须的话,请考虑
ConnectionManager::ConnectionManager() = default;\n
Run Code Online (Sandbox Code Playgroud)\n这getNativeHandle
给了我更多关于可能干扰的其他代码的问题。例如,它可能表明其他库正在执行操作,这又可能导致重叠读/写,或者它可能是更多代码存在于线程上的标志(正如Server::run()
定义阻塞一样)
连接管理器可能应该保留weak_ptr
,以便Connection
s 最终终止。现在,最后一个引用根据定义保存在连接管理器中,这意味着当对等方断开连接或会话由于某种其他原因失败时,任何内容都不会被破坏。
这不是惯用语:
\n// Check whether the server was stopped by a signal before this\n// completion handler had a chance to run.\nif (!acceptor_.is_open()) {\n return;\n}\n
Run Code Online (Sandbox Code Playgroud)\n如果您关闭了接受器,error::operation_aborted
无论如何都会调用完成处理程序。只需处理这个问题,例如在我稍后发布的最终版本中:
// separate strand for each connection - just in case you ever add threads\nacceptor_.async_accept(\n make_strand(io_context_), [this](error_code ec, tcp::socket sock) {\n if (!ec) {\n connection_manager_.register_and_start(\n std::make_shared<Connection>(std::move(sock),\n connection_manager_));\n do_accept();\n }\n });\n
Run Code Online (Sandbox Code Playgroud)\n我注意到这个评论:
\n// The server is stopped by cancelling all outstanding asynchronous\n// operations. Once all operations have finished the io_service::run()\n// call will exit.\n
Run Code Online (Sandbox Code Playgroud)\n事实上,您从未对cancel()
代码中的任何 IO 对象进行任何操作。同样,注释不会被执行。最好确实按照你所说的去做,并让析构函数关闭资源。这可以防止在关闭后使用对象时出现虚假错误,并且还可以防止非常烦人的竞争条件,例如您关闭了句柄,其他一些线程在同一文件描述符上重新打开了一个新流,并且您已将句柄交给了第三个派对(使用getNativeHandle
)...你明白这会导致什么结果吗?
通过这种方式查看后,我尝试重现该问题,因此我创建了虚假数据:
\n std::string getExecutionJsons() { return std::string(1024, \'E\'); }\n std::string getOrdersAsJsons() { return std::string(13312, \'O\'); }\n std::string getPositionsAsJsons() { return std::string(8192, \'P\'); }\n std::string createSyncDoneJson() { return std::string(24576, \'C\'); }\n
Run Code Online (Sandbox Code Playgroud)\n对 Connection 类进行一些细微的调整:
\n std::string buff_str =\n std::string(buffer_.data(), bytes_transferred);\n const auto& tokenized_buffer = split(buff_str, \' \');\n\n if (!tokenized_buffer.empty() &&\n tokenized_buffer[0] == "sync") {\n std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;\n /// "syncing connection" sends a specific text\n /// hence I can separate between sycing and long-lived\n /// connections here and act accordingly.\n\n const auto& exec_json_strs = getExecutionJsons();\n const auto& order_json_strs = getOrdersAsJsons();\n const auto& position_json_strs = getPositionsAsJsons();\n const auto& all_json_strs = exec_json_strs +\n order_json_strs + position_json_strs +\n createSyncDoneJson();\n\n std::cerr << "All json length: " << all_json_strs.length() << std::endl;\n /// this is potentially a very large data.\n do_write(all_json_strs); // already on strand!\n }\n
Run Code Online (Sandbox Code Playgroud)\n我们得到服务器输出
\nsync detected on 127.0.0.1:43012\nAll json length: 47104\nsync detected on 127.0.0.1:43044\nAll json length: 47104\n
Run Code Online (Sandbox Code Playgroud)\n以及使用 netcat 伪造的客户端:
\n$ netcat localhost 8989 <<< \'sync me\' > expected\n^C\n$\xc2\xa0wc -c expected \n47104 expected\n
Run Code Online (Sandbox Code Playgroud)\n好的。现在让我们过早断开连接:
\nnetcat localhost 8989 -w0 <<< \'sync me\' > truncated\n$ wc -c truncated \n0 truncated\n
Run Code Online (Sandbox Code Playgroud)\n所以,它确实导致提前关闭,但服务器仍然说
\nsync detected on 127.0.0.1:44176\nAll json length: 47104\n
Run Code Online (Sandbox Code Playgroud)\n我们do_write
也来测试一下:
async_write( //\n socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()),\n [/*this,*/ self](error_code ec, size_t transfer_size) {\n std::cerr << "do_write completion: " << transfer_size << " bytes ("\n << ec.message() << ")" << std::endl;\n\n if (!ec) {\n /// everything is fine.\n } else {\n /// what to do here?\n // FIXME: probably cancel the read loop so the connection\n // closes?\n }\n });\n
Run Code Online (Sandbox Code Playgroud)\n现在我们看到:
\nsync detected on 127.0.0.1:44494\nAll json length: 47104\ndo_write completion: 47104 bytes (Success)\nsync detected on 127.0.0.1:44512\nAll json length: 47104\ndo_write completion: 32768 bytes (Operation canceled)\n
Run Code Online (Sandbox Code Playgroud)\n对于一个断开连接和一个“正常”连接。
\n没有崩溃/未定义行为的迹象。让我们检查一下-fsanitize=address,undefined
:干净的记录,甚至添加心跳:
int main() {\n Server s("127.0.0.1", "8989");\n\n std::thread yolo([&s] {\n using namespace std::literals;\n int i = 1;\n\n do {\n std::this_thread::sleep_for(5s);\n } while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));\n });\n\n s.run();\n\n yolo.join();\n}\n
Run Code Online (Sandbox Code Playgroud)\n上面强调的唯一没有解决的问题是:
\n未显示其他线程问题(可能通过getNativeHandle
)
事实上,您可以在 Connection 中进行重叠写入do_write
。解决这个问题:
void Connection::write(std::string msg) { // public, might not be on the strand\n post(socket_.get_executor(),\n [self = shared_from_this(), msg = std::move(msg)]() mutable {\n self->do_write(std::move(msg));\n });\n }\n\n void Connection::do_write(std::string msg) { // assumed on the strand\n outgoing_.push_back(std::move(msg));\n\n if (outgoing_.size() == 1)\n do_write_loop();\n }\n\n void Connection::do_write_loop() {\n if (outgoing_.size() == 0)\n return;\n\n auto self(shared_from_this());\n async_write( //\n socket_, boost::asio::buffer(outgoing_.front()),\n [this, self](error_code ec, size_t transfer_size) {\n std::cerr << "write completion: " << transfer_size << " bytes ("\n << ec.message() << ")" << std::endl;\n\n if (!ec) {\n outgoing_.pop_front();\n do_write_loop();\n } else {\n socket_.cancel();\n\n // This would ideally be enough to free the connection, but\n // since `ConnectionManager` doesn\'t use `weak_ptr` you need to\n // force the issue using kind of an "umbillical cord reflux":\n connection_manager_.stop(self);\n }\n });\n }\n
Run Code Online (Sandbox Code Playgroud)\n正如您所看到的,我还拆分了write
/do_write
以防止链外调用。与 相同stop
。
包含上面所有备注/修复的完整列表:
\n文件connection.h
#pragma once\n\n #include <boost/asio.hpp>\n\n #include <array>\n #include <deque>\n #include <memory>\n #include <string>\n using boost::asio::ip::tcp;\n\n class ConnectionManager;\n\n /// Represents a single connection from a client.\n class Connection : public std::enable_shared_from_this<Connection> {\n public:\n Connection(const Connection&) = delete;\n Connection& operator=(const Connection&) = delete;\n\n /// Construct a connection with the given socket.\n explicit Connection(tcp::socket socket, ConnectionManager& manager);\n\n void start();\n void stop();\n void write(std::string msg);\n\n private:\n void do_stop();\n void do_write(std::string msg);\n void do_write_loop();\n\n /// Perform an asynchronous read operation.\n void do_read();\n\n /// Socket for the connection.\n tcp::socket socket_;\n\n /// The manager for this connection.\n ConnectionManager& connection_manager_;\n\n /// Buffer for incoming data.\n std::array<char, 8192> buffer_;\n\n std::deque<std::string> outgoing_;\n };\n\n using connection_ptr = std::shared_ptr<Connection>;\n
Run Code Online (Sandbox Code Playgroud)\n文件connection_manager.h
#pragma once\n\n #include <list>\n #include "connection.h"\n\n /// Manages open connections so that they may be cleanly stopped when the server\n /// needs to shut down.\n class ConnectionManager {\n public:\n ConnectionManager(const ConnectionManager&) = delete;\n ConnectionManager& operator=(const ConnectionManager&) = delete;\n ConnectionManager() = default; // could be split across h/cpp if you wanted\n\n void register_and_start(connection_ptr c);\n void stop(connection_ptr c);\n void stop_all();\n\n void broadcast(const std::string& buffer);\n\n // purge defunct connections, returns remaining active connections\n size_t garbage_collect();\n\n private:\n using handle = std::weak_ptr<connection_ptr::element_type>;\n std::list<handle> connections_;\n };\n
Run Code Online (Sandbox Code Playgroud)\n文件server.h
#pragma once\n\n #include <boost/asio.hpp>\n #include <string>\n #include "connection.h"\n #include "connection_manager.h"\n\n class Server {\n public:\n Server(const Server&) = delete;\n Server& operator=(const Server&) = delete;\n\n /// Construct the server to listen on the specified TCP address and port,\n /// and serve up files from the given directory.\n explicit Server(const std::string& address, const std::string& port);\n\n /// Run the server\'s io_service loop.\n void run();\n\n bool deliver(const std::string& buffer);\n\n private:\n void do_accept();\n void do_await_signal();\n\n boost::asio::io_context io_context_;\n boost::asio::any_io_executor strand_{io_context_.get_executor()};\n boost::asio::signal_set signals_{strand_};\n tcp::acceptor acceptor_{strand_};\n ConnectionManager connection_manager_;\n };\n
Run Code Online (Sandbox Code Playgroud)\n文件connection.cpp
#include "connection.h"\n\n #include <boost/algorithm/string.hpp>\n #include <iostream>\n #include <thread>\n #include <utility>\n #include <vector>\n\n #include "connection_manager.h"\n using boost::system::error_code;\n\n Connection::Connection(tcp::socket socket, ConnectionManager& manager)\n : socket_(std::move(socket))\n , connection_manager_(manager) {}\n\n void Connection::start() { // always assumed on the strand (since connection\n // just constructed)\n do_read();\n }\n\n void Connection::stop() { // public, might not be on the strand\n post(socket_.get_executor(),\n [self = shared_from_this()]() mutable {\n self->do_stop();\n });\n }\n\n void Connection::do_stop() { // assumed on the strand\n socket_.cancel(); // trust shared pointer to destruct\n }\n\n namespace /*missing code stubs*/ {\n auto split(std::string_view input, char delim) {\n std::vector<std::string_view> result;\n boost::algorithm::split(result, input,\n boost::algorithm::is_from_range(delim, delim));\n return result;\n }\n\n std::string getExecutionJsons() { return std::string(1024, \'E\'); }\n std::string getOrdersAsJsons() { return std::string(13312, \'O\'); }\n std::string getPositionsAsJsons() { return std::string(8192, \'P\'); }\n std::string createSyncDoneJson() { return std::string(24576, \'C\'); }\n } // namespace\n\n void Connection::do_read() {\n auto self(shared_from_this());\n socket_.async_read_some(\n boost::asio::buffer(buffer_),\n [this, self](error_code ec, size_t bytes_transferred) {\n if (!ec) {\n std::string buff_str =\n std::string(buffer_.data(), bytes_transferred);\n const auto& tokenized_buffer = split(buff_str, \' \');\n\n if (!tokenized_buffer.empty() &&\n tokenized_buffer[0] == "sync") {\n std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;\n /// "syncing connection" sends a specific text\n /// hence I can separate between sycing and long-lived\n /// connections here and act accordingly.\n\n const auto& exec_json_strs = getExecutionJsons();\n const auto& order_json_strs = getOrdersAsJsons();\n const auto& position_json_strs = getPositionsAsJsons();\n const auto& all_json_strs = exec_json_strs +\n order_json_strs + position_json_strs +\n createSyncDoneJson();\n\n std::cerr << "All json length: " << all_json_strs.length() << std::endl;\n /// this is potentially a very large data.\n do_write(all_json_strs); // already on strand!\n }\n\n do_read();\n } else {\n std::cerr << "do_read terminating: " << ec.message() << std::endl;\n connection_manager_.stop(shared_from_this());\n }\n });\n }\n\n void Connection::write(std::string msg) { // public, might not be on the strand\n post(socket_.get_executor(),\n [self = shared_from_this(), msg = std::move(msg)]() mutable {\n self->do_write(std::move(msg));\n });\n }\n\n void Connection::do_write(std::string msg) { // assumed on the strand\n outgoing_.push_back(std::move(msg));\n\n if (outgoing_.size() == 1)\n do_write_loop();\n }\n\n void Connection::do_write_loop() {\n if (outgoing_.size() == 0)\n return;\n\n auto self(shared_from_this());\n async_write( //\n socket_, boost::asio::buffer(outgoing_.front()),\n [this, self](error_code ec, size_t transfer_size) {\n std::cerr << "write completion: " << transfer_size << " bytes ("\n << ec.message() << ")" << std::endl;\n\n if (!ec) {\n outgoing_.pop_front();\n do_write_loop();\n } else {\n socket_.cancel();\n\n // This would ideally be enough to free the connection, but\n // since `ConnectionManager` doesn\'t use `weak_ptr` you need to\n // force the issue using kind of an "umbellical cord reflux":\n connection_manager_.stop(self);\n }\n });\n }\n
Run Code Online (Sandbox Code Playgroud)\n文件connection_manager.cpp
#include "connection_manager.h"\n\n void ConnectionManager::register_and_start(connection_ptr c) {\n connections_.emplace_back(c);\n c->start();\n }\n\n void ConnectionManager::stop(connection_ptr c) {\n c->stop();\n }\n\n void ConnectionManager::stop_all() {\n for (auto h : connections_)\n if (auto c = h.lock())\n c->stop();\n }\n\n /// this function is used to keep clients up to date with the changes, not used\n /// during syncing phase.\n void ConnectionManager::broadcast(const std::string& buffer) {\n for (auto h : connections_)\n if (auto c = h.lock())\n c->write(buffer);\n }\n\n size_t ConnectionManager::garbage_collect() {\n connections_.remove_if(std::mem_fn(&handle::expired));\n return connections_.size();\n }\n
Run Code Online (Sandbox Code Playgroud)\n文件server.cpp
#include "server.h"\n #include <signal.h>\n #include <utility>\n\n using boost::system::error_code;\n\n Server::Server(const std::string& address, const std::string& port)\n : io_context_(1) // THREAD HINT: single threaded\n , connection_manager_()\n {\n // Register to handle the signals that indicate when the server should exit.\n // It is safe to register for the same signal multiple times in a program,\n // provided all registration for the specified signal is made through Asio.\n signals_.add(SIGINT);\n signals_.add(SIGTERM);\n #if defined(SIGQUIT)\n signals_.add(SIGQUIT);\n #endif // defined(SIGQUIT)\n\n do_await_signal();\n\n // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).\n tcp::resolver resolver(io_context_);\n tcp::endpoint endpoint = *resolver.resolve({address, port});\n acceptor_.open(endpoint.protocol());\n acceptor_.set_option(tcp::acceptor::reuse_address(true));\n acceptor_.bind(endpoint);\n acceptor_.listen();\n\n do_accept();\n }\n\n void Server::run() {\n // The io_service::run() call will block until all asynchronous operations\n // have finished. While the server is running, there is always at least one\n // asynchronous operation outstanding: the asynchronous accept call waiting\n // for new incoming connections.\n io_context_.run();\n }\n\n void Server::do_accept() {\n // separate strand for each connection - just in case you ever add threads\n acceptor_.async_acce
归档时间: |
|
查看次数: |
435 次 |
最近记录: |