aj3*_*423 5 c++ boost-asio boost-coroutine
我的服务器基于 boost spawn echo server 示例,并在此线程中进行了改进。真正的服务器很复杂,我做了一个更简单的服务器来显示问题:
服务器侦听端口 12345,从新连接接收 0x4000 字节数据。
客户端运行 1000 个线程,连接到服务器并发送 0x4000 字节数据。
问题:当客户端运行时,在 1 秒后通过控制台中的Ctrl-C杀死客户端进程,然后服务器io_context将停止,服务器运行到无限循环并消耗 100% 的 CPU。如果这没有发生,重复启动客户端并杀死它几次,它会发生。可能几次后它的TCP端口用完了,等几分钟再试一次,它是在我的机器上杀死客户端3~15次后发生的。
该升压文件说,io_context.stopped()是用来判断它是否停止
通过显式调用 stop() 或由于工作用完
我从不调用io_context.stop(),并使用 amake_work_guard(io_context)来保持io_context不停止,但为什么它仍然停止?
我的环境:Win10-64bit,boost 1.71.0
服务器代码:
#include <iostream>
using namespace std;
#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
namespace ba=boost::asio;
#define SERVER_PORT 12345
#define DATA_LEN 0x4000
struct session : public std::enable_shared_from_this<session>
{
tcp::socket socket_;
boost::asio::steady_timer timer_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
explicit session(boost::asio::io_context& io_context, tcp::socket socket)
: socket_(std::move(socket)),
timer_(io_context),
strand_(io_context.get_executor())
{ }
void go()
{
auto self(shared_from_this());
boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
{
spawn(yield, [this, self](ba::yield_context yield) {
timer_.expires_from_now(10s); // 10 second
while (socket_.is_open()) {
boost::system::error_code ec;
timer_.async_wait(yield[ec]);
// timeout triggered, timer was not canceled
if (ba::error::operation_aborted != ec) {
socket_.close();
}
}
});
try
{
// recv data
string packet;
// read data
boost::system::error_code ec;
ba::async_read(socket_,
ba::dynamic_buffer(packet),
ba::transfer_exactly(DATA_LEN),
yield[ec]);
if(ec) {
throw "read_fail";
}
}
catch (...)
{
cout << "exception" << endl;
}
timer_.cancel();
socket_.close();
});
}
};
struct my_server {
my_server() { }
~my_server() { }
void start() {
ba::io_context io_context;
auto worker = ba::make_work_guard(io_context);
ba::spawn(io_context, [&](ba::yield_context yield)
{
tcp::acceptor acceptor(io_context,
tcp::endpoint(tcp::v4(), SERVER_PORT));
for (;;)
{
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.async_accept(socket, yield[ec]);
if (!ec) {
std::make_shared<session>(io_context, std::move(socket))->go();
}
}
});
// Run io_context on All CPUs
auto thread_count = std::thread::hardware_concurrency();
boost::thread_group tgroup;
for (auto i = 0; i < thread_count; ++i)
tgroup.create_thread([&] {
for (;;) {
try {
if (io_context.stopped()) { // <- this happens after killing Client process several times
cout << "io_context STOPPED, now server runs infinit loop with full cpu usage" << endl;
}
io_context.run();
}
catch(const std::exception& e) {
MessageBox(0, "This never popup", e.what(), 0);
}
catch(const boost::exception& e) {
MessageBox(0, "This never popup", boost::diagnostic_information(e).data(), 0);
}
catch(...) { MessageBox(0, "This never popup", "", 0); }
}
});
tgroup.join_all();
}
};
int main() {
my_server svr;
svr.start();
}
Run Code Online (Sandbox Code Playgroud)
客户端:
#include <iostream>
#include <random>
#include <thread>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using namespace std;
using boost::asio::ip::tcp;
namespace ba=boost::asio;
#define SERVER "127.0.0.1"
#define PORT "12345"
int main() {
boost::asio::io_context io_context;
static string data_0x4000(0x4000, 'a');
boost::thread_group tgroup;
for (auto i = 0; i < 1000; ++i)
tgroup.create_thread([&] {
for(;;) {
try {
tcp::socket s(io_context);
tcp::resolver resolver(io_context);
boost::asio::connect(s, resolver.resolve(SERVER, PORT));
ba::write(s, ba::buffer(data_0x4000));
} catch (std::exception e) {
cout << " exception: " << e.what() << endl;
} catch (...) {
cout << "unknown exception" << endl;
}
}
});
tgroup.join_all();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
更新解决方法:
我猜问题发生在io_context和协程上,所以我尝试将不必要的替换spawn为std::thread,并且它起作用了,io_context永远不会停止。但是为什么会出现这个问题呢?
代替:
ba::spawn(io_context, [&](ba::yield_context yield)
{
tcp::acceptor acceptor(io_context,
tcp::endpoint(tcp::v4(), SERVER_PORT));
for (;;)
{
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.async_accept(socket, yield[ec]);
if (!ec) {
std::make_shared<session>(io_context, std::move(socket))->go();
}
}
});
Run Code Online (Sandbox Code Playgroud)
到:
std::thread([&]()
{
tcp::acceptor acceptor(io_context,
tcp::endpoint(tcp::v4(), SERVER_PORT));
for (;;)
{
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.accept(socket, ec);
if (!ec) {
std::make_shared<session>(io_context, std::move(socket))->go();
}
}
}).detach();
Run Code Online (Sandbox Code Playgroud)
即使进行(非常)广泛的压力测试,我也无法在 Linux 上重现您的问题。
\n\n即使是硬杀客户端进程也没有显示出除了某些会话按预期到达“EOF”消息之外的任何其他影响。
\n\n存在的问题是您耗尽了可用端口,但这主要是因为您在客户端中重新连接的速度太快了。
\n\n跳出框框思考
\n\nstd::coutand/or MessageBox\xc2\xb2 而不进行同步,并且 MSVC 的标准库不能很好地处理它?catch?我不知道这是否相关,但 MSVC 确实有 SEH(结构化异常)\xc2\xb9io_context.restart();在两者之间调用。我不建议这样做,因为它会使任何定期关闭变得不可能。如果您感兴趣的话,这里是对代码的一些细微调整。它添加了处理/建立的会话/连接的一些可视化。请注意,client大部分没有变化,但server有一些变化可能会激发您的想法:
#include <iostream>\n#include <iomanip>\n\n#include <boost/thread/thread.hpp>\n#include <boost/asio.hpp>\n#include <boost/asio/spawn.hpp>\n\nnamespace ba = boost::asio;\nusing boost::asio::ip::tcp;\nusing namespace std::literals;\n\n#define SERVER_PORT 12345\n#define DATA_LEN 0x4000\n\nvoid MessageBox(int, std::string const& caption, std::string const& message, ...) {\n std::cerr << caption << ": " << std::quoted(message) << std::endl;\n}\n\nstruct session : public std::enable_shared_from_this<session>\n{\n tcp::socket socket_;\n ba::steady_timer timer_;\n ba::strand<ba::io_context::executor_type> strand_;\n\n explicit session(ba::io_context& io_context, tcp::socket socket)\n : socket_(std::move(socket)),\n timer_(io_context),\n strand_(io_context.get_executor())\n { }\n\n void go()\n {\n auto self(shared_from_this());\n ba::spawn(strand_, [this, self](ba::yield_context yield)\n {\n spawn(yield, [this, self](ba::yield_context yield) {\n while (socket_.is_open()) {\n timer_.expires_from_now(10s); \n boost::system::error_code ec;\n timer_.async_wait(yield[ec]);\n // timeout triggered, timer was not canceled\n if (ba::error::operation_aborted != ec) {\n socket_.close(ec);\n }\n }\n });\n\n try\n {\n // recv data\n std::string packet;\n\n // read data\n ba::async_read(socket_,\n ba::dynamic_buffer(packet),\n ba::transfer_exactly(DATA_LEN),\n yield);\n\n std::cout << std::unitbuf << ".";\n }\n catch (std::exception const& e) {\n std::cout << "exception: " << std::quoted(e.what()) << std::endl;\n }\n catch (...) {\n std::cout << "exception" << std::endl;\n }\n\n boost::system::error_code ec;\n timer_.cancel(ec);\n socket_.close(ec);\n });\n\n }\n};\n\nstruct my_server { \n void start() {\n ba::io_context io_context;\n auto worker = ba::make_work_guard(io_context);\n\n ba::spawn(io_context, [&](ba::yield_context yield)\n {\n tcp::acceptor acceptor(io_context,\n tcp::endpoint(tcp::v4(), SERVER_PORT));\n\n for (;;)\n {\n boost::system::error_code ec;\n\n tcp::socket socket(io_context);\n acceptor.async_accept(socket, yield[ec]);\n if (!ec) {\n std::make_shared<session>(io_context, std::move(socket))->go();\n } \n }\n });\n\n // Run io_context on All CPUs\n auto thread_count = std::thread::hardware_concurrency();\n boost::thread_group tgroup;\n for (auto i = 0u; i < thread_count; ++i) \n tgroup.create_thread([&] {\n for (;;) {\n try { \n io_context.run(); \n break;\n }\n catch(const std::exception& e) { \n MessageBox(0, "This never popup", e.what(), 0); \n }\n catch(const boost::exception& e) { \n MessageBox(0, "This never popup", boost::diagnostic_information(e).data(), 0); \n }\n catch(...) { MessageBox(0, "This never popup", "", 0); }\n }\n\n std::cout << "stopped: " << io_context.stopped() << std::endl;\n });\n tgroup.join_all();\n }\n}; \n\nint main() {\n my_server svr;\n svr.start();\n}\nRun Code Online (Sandbox Code Playgroud)\n\n#include <iostream>\n#include <random>\n#include <thread>\n#include <boost/asio.hpp>\n#include <boost/thread.hpp>\n\nusing boost::asio::ip::tcp;\nnamespace ba=boost::asio;\n\n#define SERVER "127.0.0.1"\n#define PORT "12345"\n\nint main() {\n ba::io_context io_context;\n\n static std::string const data_0x4000(0x4000, \'a\');\n\n boost::thread_group tgroup;\n for (auto i = 0; i < 1000; ++i) \n tgroup.create_thread([&] {\n for(;;) {\n\n try {\n tcp::socket s(io_context);\n\n tcp::resolver resolver(io_context);\n ba::connect(s, resolver.resolve(SERVER, PORT));\n s.set_option(ba::socket_base::reuse_address(true));\n\n ba::write(s, ba::buffer(data_0x4000));\n } catch (std::exception const& e) {\n std::cout << " exception: " << e.what() << std::endl;\n } catch (...) {\n std::cout << "unknown exception" << std::endl;\n }\n std::cout << std::unitbuf << ".";\n }\n });\n\n tgroup.join_all();\n}\nRun Code Online (Sandbox Code Playgroud)\n\n\xc2\xb9 参见例如https://learn.microsoft.com/en-us/cpp/build/reference/eh-exception-handling-model?view=vs-2019#remarks
\n\n\xc2\xb2 也许MessageBox只允许来自“UI”线程。