mos*_*osh 1 c++ boost boost-asio
我正在尝试编写服务器,boost::asio 但我希望boost::asio::async_read如果没有数据到来,操作就会超时,但我可以弄清楚如何做到这一点。这是我到目前为止的代码
void do_read_header() {
auto self(shared_from_this());
std::cout << "do_read_header\n";
boost::asio::async_read(
socket_, boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec,
std::size_t length) {
if (!ec && res.decode_header()) {
do_read_body();
}
});
do_write();
}
void do_read_body() {
auto self(shared_from_this());
Message msg;
std::cout << "do_read_body\n";
boost::asio::async_read(
socket_, boost::asio::buffer(res.body(), res.body_length()),
[this, self](boost::system::error_code ec,
std::size_t length) {
if (!length) {
return;
}
if (!ec) {
try {
std::cout << "read " << res.body() << "\n";
request_queue_.send(res.body(), res.body_length(),
0);
} catch (const std::exception& ex) {
std::cout << ex.what() << "\n";
}
} else {
if (ec) {
std::cerr << "read error:" << ec.value()
<< " message: " << ec.message() << "\n";
}
socket_.close();
}
do_read_header();
});
}
void start() {
post(strand_, [this, self = shared_from_this()] {
do_read_header();
do_write();
});
}
class Server {
public:
Server(boost::asio::io_service& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
socket_, [this](boost::system::error_code ec) {
if (!ec) {
std::cout << "accept connection\n";
std::make_shared<Session>(std::move(socket_))
->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
Run Code Online (Sandbox Code Playgroud)
您可以添加一个截止时间计时器来取消 IO 操作。您可以观察到取消,因为将使用 调用完成error::operation_aborted。
deadline_.expires_from_now(1s);
deadline_.async_wait([self, this] (error_code ec) {
if (!ec) socket_.cancel();
});
Run Code Online (Sandbox Code Playgroud)
我花了大约 45 分钟使代码的其余部分变得独立:
另请注意,我们避免关闭套接字 - 这是在会话的析构函数中完成的。最好优雅地关闭。
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;
namespace bip = boost::interprocess;
using boost::asio::ip::tcp;
using boost::system::error_code;
using Queue = boost::interprocess::message_queue;
static constexpr auto MAX_MESG_LEN = 100;
static constexpr auto MAX_MESGS = 10;
struct Message {
using Len = boost::endian::big_uint32_t;
struct header_t {
Len len;
};
static const auto header_length = sizeof(header_t);
std::array<char, MAX_MESG_LEN + header_length> buf;
char const* data() const { return buf.data(); }
char* data() { return buf.data(); }
char const* body() const { return data() + header_length; }
char* body() { return data() + header_length; }
static_assert(std::is_standard_layout_v<header_t> and
std::is_trivial_v<header_t>);
Len body_length() const { return std::min(h().len, max_body_length()); }
Len max_body_length() const { return buf.max_size() - header_length; }
bool decode_header() { return h().len <= max_body_length(); }
bool set_body(std::string_view value) {
assert(value.length() <= max_body_length());
h().len = value.length();
std::copy_n(value.begin(), body_length(), body());
return (value.length() == body_length()); // not truncated
}
private:
header_t& h() { return *reinterpret_cast<header_t*>(data()); }
header_t const& h() const { return *reinterpret_cast<header_t const*>(data()); }
};
struct Session : std::enable_shared_from_this<Session> {
Session(tcp::socket&& s) : socket_(std::move(s)) {}
void start() {
post(strand_,
[ this, self = shared_from_this() ] { do_read_header(); });
}
private:
using Strand = boost::asio::strand<tcp::socket::executor_type>;
using Timer = boost::asio::steady_timer;
tcp::socket socket_{strand_};
Strand strand_{make_strand(socket_.get_executor())};
Message res;
Queue request_queue_{bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN};
Timer recv_deadline_{strand_};
void do_read_header() {
auto self(shared_from_this());
std::cout << "do_read_header: " << res.header_length << std::endl;
recv_deadline_.expires_from_now(5s);
recv_deadline_.async_wait([ self, this ](error_code ec) {
if (!ec) {
std::cerr << "header timeout" << std::endl;
socket_.cancel();
}
});
boost::asio::async_read(
socket_, boost::asio::buffer(res.data(), res.header_length),
[ this, self ](error_code ec, size_t /*length*/) {
std::cerr << "header: " << ec.message() << std::endl;
recv_deadline_.cancel();
if (!ec && res.decode_header()) {
do_read_body();
} else {
socket_.shutdown(tcp::socket::shutdown_both);
}
});
}
void do_read_body() {
auto self(shared_from_this());
// Message msg;
std::cout << "do_read_body: " << res.body_length() << std::endl;
recv_deadline_.expires_from_now(1s);
recv_deadline_.async_wait([self, this] (error_code ec) {
if (!ec) {
std::cerr << "body timeout" << std::endl;
socket_.cancel();
}
});
boost::asio::async_read(
socket_,
boost::asio::buffer(res.body(), res.body_length()),
boost::asio::transfer_exactly(res.body_length()),
[ this, self ](error_code ec, std::size_t length) {
std::cerr << "body: " << ec.message() << std::endl;
recv_deadline_.cancel();
if (!ec) {
try {
// Not safe to print unless NUL-terminated, see e.g.
// /sf/ask/4639516941/#66279497
if (length)
request_queue_.send(res.body(), res.body_length(), 0);
} catch (const std::exception& ex) {
std::cout << ex.what() << std::endl;
}
do_read_header();
} else {
socket_.shutdown(tcp::socket::shutdown_both);
}
});
}
};
class Server {
public:
Server(boost::asio::io_service& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(socket_, [ this ](error_code ec) {
std::cerr << "async_accept: " << ec.message() << std::endl;
if (!ec) {
std::cerr << "session: " << socket_.remote_endpoint() << std::endl;
std::make_shared<Session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
int main(int argc, char**) {
Queue queue{bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN}; // ensure it exists
if (argc == 1) {
boost::asio::io_context ioc;
Server s(ioc, 8989);
ioc.run_for(10s);
} else {
while (true) {
using Buf = std::array<char, MAX_MESG_LEN>;
Buf buf;
unsigned prio;
size_t n;
queue.receive(buf.data(), buf.size(), n, prio);
std::cout << "Received: " << std::quoted(std::string_view(buf.data(), n)) << std::endl;
}
}
}
Run Code Online (Sandbox Code Playgroud)
可测试用
./sotest
Run Code Online (Sandbox Code Playgroud)
在另一个终端中:
./sotest consumer
Run Code Online (Sandbox Code Playgroud)
还有其他地方,例如一些不会超时的请求:
for msg in '0000 0000' '0000 0001 31' '0000 000c 6865 6c6c 6f20 776f 726c 640a'
do
xxd -r -p <<< "$msg" |
netcat localhost 8989 -w 1
done
Run Code Online (Sandbox Code Playgroud)
或者,单个会话上多个请求,然后会话超时(-w 6超过5秒):
msg='0000 0000 0000 0001 31 0000 000c 6865 6c6c 6f20 776f 726c 640a'; xxd -r -p <<< "$msg"| netcat localhost 8989 -w 6
Run Code Online (Sandbox Code Playgroud)