Vla*_*hin 1 c++ boost tcp client-server boost-asio
最近,我从libevent切换到boost :: asio,一周后我注意到一件奇怪的事情:当我从客户端读取数据时,这些数据中的一些似乎是重复的,好像库不需要将其标记为已读(或类似的东西).我的"阅读"方法如下所示:
void client::doRead() {
delete readBuffer; // getting rid of old data
readBuffer = new SerializedBuffer((uint) READ_BUFFER_SIZE);
readBuffer->position(0);
asio::async_read(socket, asio::buffer(readBuffer->bytes(), READ_BUFFER_SIZE),
asio::transfer_at_least(1),
boost::bind(&client::onRead, shared_from_this(), _1, _2));
}
std::mutex readMutex;
void client::onRead(const asio::error_code &err, size_t nbytes) {
if (readMutex.try_lock()) {
if (err) {
stop();
return;
}
readBuffer->rewind();
uint limit = (uint) nbytes;
readBuffer->limit(limit);
// I've lost hope
SerializedBuffer *sbuff = new SerializedBuffer(limit);
memcpy(sbuff->bytes(), readBuffer->bytes(), limit);
sbuff->limit(limit);
// that's where I'm checking what I get
Utils::print(sbuff->bytes(), limit);
onReceivedData(shared_from_this(), sbuff);
delete sbuff;
readMutex.unlock();
}
}
Run Code Online (Sandbox Code Playgroud)
例如,客户端发送的数据如下:
<< 38
<< FA 6F 73 ... BE BB
Run Code Online (Sandbox Code Playgroud)
(两次写入二进制数据)
但服务器收到:
>> FA
>> FA 6F 73 ... BE BB
Run Code Online (Sandbox Code Playgroud)
'字节'38'在哪里消失?' - 我问我自己.
在成功传输3-4个数据包后,这种情况发生的概率为10%.
反模式太多了.
new/deletetry_lock是一种气味)try_lock,处理所有返回值!(并使用std::adopt_lock/ std::defer_lock)asio::io_service::strand是你需要的)asio::const_buffer*为什么类型print采取原始指针+大小?Boost有string_ref,标准库有string_view,Asio有asio::buffer设施.position()/ position(uint)成员那么它是一个准类,你可能只是暴露成员字段.READ_BUFFER_SIZE 无符号(例如#define RBS 1024u)SerializedBuffer正确初始化而不是要求,readBuffer->position(0);施工后立即我知道你正在使用非增强型Asio,但允许我(因为你使用boost/bind):
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <mutex>
#include <iostream>
#include <iomanip>
#define READ_BUFFER_SIZE 1024
namespace Utils {
void print(char const* data, size_t n) {
for (auto it=data; it <data+n; ++it) {
int v = static_cast<uint8_t>(*it);
std::cout << std::hex << std::setw(2) << std::setfill('0') << v << " ";
}
std::cout << "\n";
}
}
struct SerializedBuffer {
std::vector<char> _buf;
size_t _pos = 0;
size_t position() const { return _pos; }
void position(size_t n) { _pos = n; }
void rewind() { position(0); }
char* bytes() { return _buf.data(); }
size_t limit() { return _buf.size(); }
void limit(size_t n) { _buf.resize(n); }
SerializedBuffer(size_t n) : _buf(n) {}
};
struct client : boost::enable_shared_from_this<client> {
client(boost::asio::io_service& svc) : _svc(svc), socket(_svc) {
socket.connect({boost::asio::ip::address{}, 6767});
}
void doRead();
SerializedBuffer* readBuffer = nullptr;
void onRead(boost::system::error_code err, size_t nbytes);
void stop() {
socket.get_io_service().stop();
}
void onReceivedData(boost::shared_ptr<client> This, SerializedBuffer* sb) {
std::cout << __PRETTY_FUNCTION__ << ": ";
Utils::print(sb->bytes(), sb->limit());
doRead();
}
private:
boost::asio::io_service& _svc;
boost::asio::ip::tcp::socket socket;
};
void client::doRead() {
delete readBuffer; // getting rid of old data
readBuffer = new SerializedBuffer((uint) READ_BUFFER_SIZE);
readBuffer->position(0);
boost::asio::async_read(socket, boost::asio::buffer(readBuffer->bytes(), READ_BUFFER_SIZE),
boost::asio::transfer_at_least(1),
boost::bind(&client::onRead, shared_from_this(), _1, _2));
}
std::mutex readMutex;
void client::onRead(boost::system::error_code err, size_t nbytes) {
if (readMutex.try_lock()) {
if (err) {
stop();
return;
}
readBuffer->rewind();
uint limit = (uint) nbytes;
readBuffer->limit(limit);
// I've lost hope
SerializedBuffer *sbuff = new SerializedBuffer(limit);
memcpy(sbuff->bytes(), readBuffer->bytes(), limit);
sbuff->limit(limit);
// that's where I'm checking what I get
Utils::print(sbuff->bytes(), limit);
onReceivedData(shared_from_this(), sbuff);
delete sbuff;
readMutex.unlock();
}
}
int main() {
boost::asio::io_service svc;
auto c = boost::make_shared<client>(svc);
c->doRead();
svc.run();
}
Run Code Online (Sandbox Code Playgroud)
哪个在像模拟服务器上运行时那样
(printf '\x38\n'; sleep 3; printf '\xFA\x6F\x73....\xBE\xBB\n') | netcat -l -p 6767
Run Code Online (Sandbox Code Playgroud)
打印
38 0a
void client::onReceivedData(boost::shared_ptr<client>, SerializedBuffer*): 38 0a
fa 6f 73 2e 2e 2e 2e be bb 0a
void client::onReceivedData(boost::shared_ptr<client>, SerializedBuffer*): fa 6f 73 2e 2e 2e 2e be bb 0a
Run Code Online (Sandbox Code Playgroud)
当然我没有修复内存泄漏:
==31732==ERROR: LeakSanitizer: detected memory leaks
Direct leak of 32 byte(s) in 1 object(s) allocated from:
#0 0x7ff150beffb0 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.3+0xc7fb0)
#1 0x41fa58 in client::doRead() /home/sehe/Projects/stackoverflow/test.cpp:63
#2 0x44150f in client::onReceivedData(boost::shared_ptr<client>, SerializedBuffer*) /home/sehe/Projects/stackoverflow/test.cpp:53
#3 0x4200a6 in client::onRead(boost::system::error_code, unsigned long) /home/sehe/Projects/stackoverflow/test.cpp:90
#4 0x454ff0 in void boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>::call<boost::shared_ptr<client>, boost::system::error_code, unsigned long>(boost::shared_ptr<client>&, void const*, boost::system::error_code&, unsigned long&) const /home/sehe/
#5 0x4543bc in void boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>::operator()<boost::shared_ptr<client> >(boost::shared_ptr<client>&, boost::system::error_code, unsigned long) const /home/sehe/custom/boost/boost/bind/mem_fn_template.hpp:286
#6 0x4532a4 in void boost::_bi::list3<boost::_bi::value<boost::shared_ptr<client> >, boost::arg<1>, boost::arg<2> >::operator()<boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>, boost::_bi::rrlist2<boost::system::error_code const&, unsigned long
#7 0x452465 in void boost::_bi::bind_t<void, boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>, boost::_bi::list3<boost::_bi::value<boost::shared_ptr<client> >, boost::arg<1>, boost::arg<2> > >::operator()<boost::system::error_code const&, unsigne
#8 0x44f361 in boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::detail::transfer_at_least_t, boost::_bi::bind_t<void, boost::_mfi:
#9 0x456eb4 in boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::detail::transfer_at_least_t, boost::_
#10 0x456dd6 in void boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio
#11 0x456d3a in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, bo
#12 0x456a6e in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boo
#13 0x4565ed in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, bo
#14 0x455cdb in boost::asio::detail::reactive_socket_recv_op<boost::asio::mutable_buffers_1, boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, b
#15 0x426f4b in boost::asio::detail::task_io_service_operation::complete(boost::asio::detail::task_io_service&, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/task_io_service_operation.hpp:38
#16 0x432dd5 in boost::asio::detail::epoll_reactor::descriptor_state::do_complete(boost::asio::detail::task_io_service*, boost::asio::detail::task_io_service_operation*, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/impl/epol
#17 0x426f4b in boost::asio::detail::task_io_service_operation::complete(boost::asio::detail::task_io_service&, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/task_io_service_operation.hpp:38
#18 0x4373af in boost::asio::detail::task_io_service::do_run_one(boost::asio::detail::scoped_lock<boost::asio::detail::posix_mutex>&, boost::asio::detail::task_io_service_thread_info&, boost::system::error_code const&) /home/sehe/custom/boost/boost/asio/detail/impl/task
#19 0x435739 in boost::asio::detail::task_io_service::run(boost::system::error_code&) /home/sehe/custom/boost/boost/asio/detail/impl/task_io_service.ipp:149
#20 0x438804 in boost::asio::io_service::run() /home/sehe/custom/boost/boost/asio/impl/io_service.ipp:59
#21 0x42024e in main /home/sehe/Projects/stackoverflow/test.cpp:102
#22 0x7ff14f09d82f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x2082f)
Indirect leak of 1024 byte(s) in 1 object(s) allocated from:
#0 0x7ff150beffb0 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.3+0xc7fb0)
#1 0x4537d4 in __gnu_cxx::new_allocator<char>::allocate(unsigned long, void const*) /usr/include/c++/6/ext/new_allocator.h:104
#2 0x4527bb in std::allocator_traits<std::allocator<char> >::allocate(std::allocator<char>&, unsigned long) /usr/include/c++/6/bits/alloc_traits.h:436
#3 0x451284 in std::_Vector_base<char, std::allocator<char> >::_M_allocate(unsigned long) /usr/include/c++/6/bits/stl_vector.h:170
#4 0x4514ea in std::_Vector_base<char, std::allocator<char> >::_M_create_storage(unsigned long) /usr/include/c++/6/bits/stl_vector.h:185
#5 0x44d8be in std::_Vector_base<char, std::allocator<char> >::_Vector_base(unsigned long, std::allocator<char> const&) /usr/include/c++/6/bits/stl_vector.h:136
#6 0x4494fc in std::vector<char, std::allocator<char> >::vector(unsigned long, std::allocator<char> const&) /usr/include/c++/6/bits/stl_vector.h:280
#7 0x440f09 in SerializedBuffer::SerializedBuffer(unsigned long) /home/sehe/Projects/stackoverflow/test.cpp:33
#8 0x41fa88 in client::doRead() /home/sehe/Projects/stackoverflow/test.cpp:63
#9 0x44150f in client::onReceivedData(boost::shared_ptr<client>, SerializedBuffer*) /home/sehe/Projects/stackoverflow/test.cpp:53
#10 0x4200a6 in client::onRead(boost::system::error_code, unsigned long) /home/sehe/Projects/stackoverflow/test.cpp:90
#11 0x454ff0 in void boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>::call<boost::shared_ptr<client>, boost::system::error_code, unsigned long>(boost::shared_ptr<client>&, void const*, boost::system::error_code&, unsigned long&) const /home/sehe
#12 0x4543bc in void boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>::operator()<boost::shared_ptr<client> >(boost::shared_ptr<client>&, boost::system::error_code, unsigned long) const /home/sehe/custom/boost/boost/bind/mem_fn_template.hpp:286
#13 0x4532a4 in void boost::_bi::list3<boost::_bi::value<boost::shared_ptr<client> >, boost::arg<1>, boost::arg<2> >::operator()<boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>, boost::_bi::rrlist2<boost::system::error_code const&, unsigned long
#14 0x452465 in void boost::_bi::bind_t<void, boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>, boost::_bi::list3<boost::_bi::value<boost::shared_ptr<client> >, boost::arg<1>, boost::arg<2> > >::operator()<boost::system::error_code const&, unsign
#15 0x44f361 in boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::detail::transfer_at_least_t, boost::_bi::bind_t<void, boost::_mfi
#16 0x456eb4 in boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::detail::transfer_at_least_t, boost::
#17 0x456dd6 in void boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio
#18 0x456d3a in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, bo
#19 0x456a6e in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boo
#20 0x4565ed in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, bo
#21 0x455cdb in boost::asio::detail::reactive_socket_recv_op<boost::asio::mutable_buffers_1, boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, b
#22 0x426f4b in boost::asio::detail::task_io_service_operation::complete(boost::asio::detail::task_io_service&, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/task_io_service_operation.hpp:38
#23 0x432dd5 in boost::asio::detail::epoll_reactor::descriptor_state::do_complete(boost::asio::detail::task_io_service*, boost::asio::detail::task_io_service_operation*, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/impl/epol
#24 0x426f4b in boost::asio::detail::task_io_service_operation::complete(boost::asio::detail::task_io_service&, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/task_io_service_operation.hpp:38
#25 0x4373af in boost::asio::detail::task_io_service::do_run_one(boost::asio::detail::scoped_lock<boost::asio::detail::posix_mutex>&, boost::asio::detail::task_io_service_thread_info&, boost::system::error_code const&) /home/sehe/custom/boost/boost/asio/detail/impl/task
#26 0x435739 in boost::asio::detail::task_io_service::run(boost::system::error_code&) /home/sehe/custom/boost/boost/asio/detail/impl/task_io_service.ipp:149
#27 0x438804 in boost::asio::io_service::run() /home/sehe/custom/boost/boost/asio/impl/io_service.ipp:59
#28 0x42024e in main /home/sehe/Projects/stackoverflow/test.cpp:102
#29 0x7ff14f09d82f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x2082f)
SUMMARY: AddressSanitizer: 1056 byte(s) leaked in 2 allocation(s).
Run Code Online (Sandbox Code Playgroud)
如果需要,只需使用unique_ptr和移动所有权:
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <mutex>
#include <iostream>
#include <iomanip>
#define READ_BUFFER_SIZE 1024u
namespace Utils {
void print(char const* data, size_t n) {
for (auto it=data; it <data+n; ++it) {
int v = static_cast<uint8_t>(*it);
std::cout << std::hex << std::setw(2) << std::setfill('0') << v << " ";
}
std::cout << "\n";
}
}
struct SerializedBuffer {
std::vector<char> _buf;
size_t _pos = 0;
size_t position() const { return _pos; }
void position(size_t n) { _pos = n; }
void rewind() { position(0); }
char* bytes() { return _buf.data(); }
size_t limit() { return _buf.size(); }
void limit(size_t n) { _buf.resize(n); }
SerializedBuffer(size_t n) : _buf(n) {}
};
struct client : boost::enable_shared_from_this<client> {
client(boost::asio::io_service& svc) : _svc(svc), socket(_svc) {
socket.connect({boost::asio::ip::address{}, 6767});
}
void doRead();
void onRead(boost::system::error_code err, size_t nbytes);
void stop() {
socket.get_io_service().stop();
}
void onReceivedData(boost::shared_ptr<client> This, std::unique_ptr<SerializedBuffer> sb) {
std::cout << __PRETTY_FUNCTION__ << ": ";
Utils::print(sb->bytes(), sb->limit());
doRead();
}
private:
std::unique_ptr<SerializedBuffer> readBuffer;
boost::asio::io_service& _svc;
boost::asio::ip::tcp::socket socket;
};
void client::doRead() {
readBuffer.reset(new SerializedBuffer(READ_BUFFER_SIZE));
boost::asio::async_read(socket, boost::asio::buffer(readBuffer->bytes(), READ_BUFFER_SIZE),
boost::asio::transfer_at_least(1),
boost::bind(&client::onRead, shared_from_this(), _1, _2));
}
void client::onRead(boost::system::error_code err, size_t nbytes) {
if (err) {
stop();
return;
}
readBuffer->rewind();
readBuffer->limit(nbytes);
onReceivedData(shared_from_this(), std::move(readBuffer));
}
int main() {
boost::asio::io_service svc;
auto c = boost::make_shared<client>(svc);
c->doRead();
svc.run();
}
Run Code Online (Sandbox Code Playgroud)
打印:
$ ./sotest
void client::onReceivedData(boost::shared_ptr<client>, std::unique_ptr<SerializedBuffer>): 38 0a fa 6f 73 2e 2e 2e 2e be bb 0a
Run Code Online (Sandbox Code Playgroud)
要么
$ ./sotest
void client::onReceivedData(boost::shared_ptr<client>, std::unique_ptr<SerializedBuffer>): 38 0a
void client::onReceivedData(boost::shared_ptr<client>, std::unique_ptr<SerializedBuffer>): fa 6f 73 2e 2e 2e 2e be bb 0a
Run Code Online (Sandbox Code Playgroud)
取决于时间.
| 归档时间: |
|
| 查看次数: |
309 次 |
| 最近记录: |