boost :: asio在async_read中复制输入数据

Vla*_*hin 1 c++ boost tcp client-server boost-asio

最近,我从libevent切换到boost :: asio,一周后我注意到一件奇怪的事情:当我从客户端读取数据时,这些数据中的一些似乎是重复的,好像库不需要将其标记为已读(或类似的东西).我的"阅读"方法如下所示:

void client::doRead() {
    delete readBuffer; // getting rid of old data 
    readBuffer = new SerializedBuffer((uint) READ_BUFFER_SIZE);
    readBuffer->position(0);
    asio::async_read(socket, asio::buffer(readBuffer->bytes(), READ_BUFFER_SIZE),
            asio::transfer_at_least(1),
            boost::bind(&client::onRead, shared_from_this(), _1, _2));
}

std::mutex readMutex;
void client::onRead(const asio::error_code &err, size_t nbytes) {
    if (readMutex.try_lock()) {
        if (err) {
            stop();
            return;
        }

        readBuffer->rewind();
        uint limit = (uint) nbytes;
        readBuffer->limit(limit);

        // I've lost hope
        SerializedBuffer *sbuff = new SerializedBuffer(limit);
        memcpy(sbuff->bytes(), readBuffer->bytes(), limit);
        sbuff->limit(limit);

        // that's where I'm checking what I get
        Utils::print(sbuff->bytes(), limit);

        onReceivedData(shared_from_this(), sbuff);
        delete sbuff;
        readMutex.unlock();
    }
}
Run Code Online (Sandbox Code Playgroud)

例如,客户端发送的数据如下:

<< 38
<< FA 6F 73 ... BE BB
Run Code Online (Sandbox Code Playgroud)

(两次写入二进制数据)

但服务器收到:

>> FA
>> FA 6F 73 ... BE BB
Run Code Online (Sandbox Code Playgroud)

'字节'38'在哪里消失?' - 我问我自己.
在成功传输3-4个数据包后,这种情况发生的概率为10%.

seh*_*ehe 5

反模式太多了.

  • 不要用new/delete
  • 不要使用非RAII锁(try_lock是一种气味)
  • 如果你使用try_lock,处理所有返回值!(并使用std::adopt_lock/ std::defer_lock)
  • 不要使用具有异步操作的锁(asio::io_service::strand是你需要的)
  • 除非您需要,否则不要执行自己的缓冲区实现.特别是,当你有一个很好的抽象,就像asio::const_buffer*为什么类型print采取原始指针+大小?Boost有string_ref,标准库有string_view,Asio有asio::buffer设施.
  • 如果你的班级有position()/ position(uint)成员那么它是一个准类,你可能只是暴露成员字段.
  • 只是做READ_BUFFER_SIZE 无符号(例如#define RBS 1024u)
  • SerializedBuffer正确初始化而不是要求,readBuffer->position(0);施工后立即

SSCCE

我知道你正在使用非增强型Asio,但允许我(因为你使用boost/bind):

Live On Coliru

#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <mutex>
#include <iostream>
#include <iomanip>

#define READ_BUFFER_SIZE 1024

namespace Utils {
    void print(char const* data, size_t n) {
        for (auto it=data; it <data+n; ++it) {
            int v = static_cast<uint8_t>(*it);
            std::cout << std::hex << std::setw(2) << std::setfill('0') << v << " ";
        }
        std::cout << "\n";
    }
}

struct SerializedBuffer {
    std::vector<char> _buf;
    size_t _pos = 0;

    size_t position() const { return _pos; }
    void position(size_t n) { _pos = n; }
    void rewind() { position(0); }
    char* bytes() { return _buf.data(); }

    size_t limit() { return _buf.size(); }
    void limit(size_t n) { _buf.resize(n); }

    SerializedBuffer(size_t n) : _buf(n) {}
};

struct client : boost::enable_shared_from_this<client> {
    client(boost::asio::io_service& svc) : _svc(svc), socket(_svc) {
        socket.connect({boost::asio::ip::address{}, 6767});
    }
    void doRead();

    SerializedBuffer* readBuffer = nullptr;
    void onRead(boost::system::error_code err, size_t nbytes);

    void stop() {
        socket.get_io_service().stop();
    }

    void onReceivedData(boost::shared_ptr<client> This, SerializedBuffer* sb) {
        std::cout << __PRETTY_FUNCTION__ << ": ";
        Utils::print(sb->bytes(), sb->limit());

        doRead();
    }

  private:
    boost::asio::io_service& _svc;
    boost::asio::ip::tcp::socket socket;
};

void client::doRead() {
    delete readBuffer; // getting rid of old data 
    readBuffer = new SerializedBuffer((uint) READ_BUFFER_SIZE);
    readBuffer->position(0);
    boost::asio::async_read(socket, boost::asio::buffer(readBuffer->bytes(), READ_BUFFER_SIZE),
            boost::asio::transfer_at_least(1),
            boost::bind(&client::onRead, shared_from_this(), _1, _2));
}

std::mutex readMutex;
void client::onRead(boost::system::error_code err, size_t nbytes) {
    if (readMutex.try_lock()) {
        if (err) {
            stop();
            return;
        }

        readBuffer->rewind();
        uint limit = (uint) nbytes;
        readBuffer->limit(limit);

        // I've lost hope
        SerializedBuffer *sbuff = new SerializedBuffer(limit);
        memcpy(sbuff->bytes(), readBuffer->bytes(), limit);
        sbuff->limit(limit);

        // that's where I'm checking what I get
        Utils::print(sbuff->bytes(), limit);

        onReceivedData(shared_from_this(), sbuff);
        delete sbuff;
        readMutex.unlock();
    }
}

int main() {
    boost::asio::io_service svc;

    auto c = boost::make_shared<client>(svc);
    c->doRead();

    svc.run();
}
Run Code Online (Sandbox Code Playgroud)

哪个在像模拟服务器上运行时那样

(printf '\x38\n'; sleep 3; printf '\xFA\x6F\x73....\xBE\xBB\n') | netcat -l -p 6767
Run Code Online (Sandbox Code Playgroud)

打印

38 0a 
void client::onReceivedData(boost::shared_ptr<client>, SerializedBuffer*): 38 0a 
fa 6f 73 2e 2e 2e 2e be bb 0a 
void client::onReceivedData(boost::shared_ptr<client>, SerializedBuffer*): fa 6f 73 2e 2e 2e 2e be bb 0a 
Run Code Online (Sandbox Code Playgroud)

当然我没有修复内存泄漏:

==31732==ERROR: LeakSanitizer: detected memory leaks

Direct leak of 32 byte(s) in 1 object(s) allocated from:
    #0 0x7ff150beffb0 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.3+0xc7fb0)
    #1 0x41fa58 in client::doRead() /home/sehe/Projects/stackoverflow/test.cpp:63
    #2 0x44150f in client::onReceivedData(boost::shared_ptr<client>, SerializedBuffer*) /home/sehe/Projects/stackoverflow/test.cpp:53
    #3 0x4200a6 in client::onRead(boost::system::error_code, unsigned long) /home/sehe/Projects/stackoverflow/test.cpp:90
    #4 0x454ff0 in void boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>::call<boost::shared_ptr<client>, boost::system::error_code, unsigned long>(boost::shared_ptr<client>&, void const*, boost::system::error_code&, unsigned long&) const /home/sehe/
    #5 0x4543bc in void boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>::operator()<boost::shared_ptr<client> >(boost::shared_ptr<client>&, boost::system::error_code, unsigned long) const /home/sehe/custom/boost/boost/bind/mem_fn_template.hpp:286
    #6 0x4532a4 in void boost::_bi::list3<boost::_bi::value<boost::shared_ptr<client> >, boost::arg<1>, boost::arg<2> >::operator()<boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>, boost::_bi::rrlist2<boost::system::error_code const&, unsigned long 
    #7 0x452465 in void boost::_bi::bind_t<void, boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>, boost::_bi::list3<boost::_bi::value<boost::shared_ptr<client> >, boost::arg<1>, boost::arg<2> > >::operator()<boost::system::error_code const&, unsigne
    #8 0x44f361 in boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::detail::transfer_at_least_t, boost::_bi::bind_t<void, boost::_mfi:
    #9 0x456eb4 in boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::detail::transfer_at_least_t, boost::_
    #10 0x456dd6 in void boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio
    #11 0x456d3a in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, bo
    #12 0x456a6e in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boo
    #13 0x4565ed in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, bo
    #14 0x455cdb in boost::asio::detail::reactive_socket_recv_op<boost::asio::mutable_buffers_1, boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, b
    #15 0x426f4b in boost::asio::detail::task_io_service_operation::complete(boost::asio::detail::task_io_service&, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/task_io_service_operation.hpp:38
    #16 0x432dd5 in boost::asio::detail::epoll_reactor::descriptor_state::do_complete(boost::asio::detail::task_io_service*, boost::asio::detail::task_io_service_operation*, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/impl/epol
    #17 0x426f4b in boost::asio::detail::task_io_service_operation::complete(boost::asio::detail::task_io_service&, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/task_io_service_operation.hpp:38
    #18 0x4373af in boost::asio::detail::task_io_service::do_run_one(boost::asio::detail::scoped_lock<boost::asio::detail::posix_mutex>&, boost::asio::detail::task_io_service_thread_info&, boost::system::error_code const&) /home/sehe/custom/boost/boost/asio/detail/impl/task
    #19 0x435739 in boost::asio::detail::task_io_service::run(boost::system::error_code&) /home/sehe/custom/boost/boost/asio/detail/impl/task_io_service.ipp:149
    #20 0x438804 in boost::asio::io_service::run() /home/sehe/custom/boost/boost/asio/impl/io_service.ipp:59
    #21 0x42024e in main /home/sehe/Projects/stackoverflow/test.cpp:102
    #22 0x7ff14f09d82f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x2082f)

Indirect leak of 1024 byte(s) in 1 object(s) allocated from:
    #0 0x7ff150beffb0 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.3+0xc7fb0)
    #1 0x4537d4 in __gnu_cxx::new_allocator<char>::allocate(unsigned long, void const*) /usr/include/c++/6/ext/new_allocator.h:104
    #2 0x4527bb in std::allocator_traits<std::allocator<char> >::allocate(std::allocator<char>&, unsigned long) /usr/include/c++/6/bits/alloc_traits.h:436
    #3 0x451284 in std::_Vector_base<char, std::allocator<char> >::_M_allocate(unsigned long) /usr/include/c++/6/bits/stl_vector.h:170
    #4 0x4514ea in std::_Vector_base<char, std::allocator<char> >::_M_create_storage(unsigned long) /usr/include/c++/6/bits/stl_vector.h:185
    #5 0x44d8be in std::_Vector_base<char, std::allocator<char> >::_Vector_base(unsigned long, std::allocator<char> const&) /usr/include/c++/6/bits/stl_vector.h:136
    #6 0x4494fc in std::vector<char, std::allocator<char> >::vector(unsigned long, std::allocator<char> const&) /usr/include/c++/6/bits/stl_vector.h:280
    #7 0x440f09 in SerializedBuffer::SerializedBuffer(unsigned long) /home/sehe/Projects/stackoverflow/test.cpp:33
    #8 0x41fa88 in client::doRead() /home/sehe/Projects/stackoverflow/test.cpp:63
    #9 0x44150f in client::onReceivedData(boost::shared_ptr<client>, SerializedBuffer*) /home/sehe/Projects/stackoverflow/test.cpp:53
    #10 0x4200a6 in client::onRead(boost::system::error_code, unsigned long) /home/sehe/Projects/stackoverflow/test.cpp:90
    #11 0x454ff0 in void boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>::call<boost::shared_ptr<client>, boost::system::error_code, unsigned long>(boost::shared_ptr<client>&, void const*, boost::system::error_code&, unsigned long&) const /home/sehe
    #12 0x4543bc in void boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>::operator()<boost::shared_ptr<client> >(boost::shared_ptr<client>&, boost::system::error_code, unsigned long) const /home/sehe/custom/boost/boost/bind/mem_fn_template.hpp:286
    #13 0x4532a4 in void boost::_bi::list3<boost::_bi::value<boost::shared_ptr<client> >, boost::arg<1>, boost::arg<2> >::operator()<boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>, boost::_bi::rrlist2<boost::system::error_code const&, unsigned long
    #14 0x452465 in void boost::_bi::bind_t<void, boost::_mfi::mf2<void, client, boost::system::error_code, unsigned long>, boost::_bi::list3<boost::_bi::value<boost::shared_ptr<client> >, boost::arg<1>, boost::arg<2> > >::operator()<boost::system::error_code const&, unsign
    #15 0x44f361 in boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::detail::transfer_at_least_t, boost::_bi::bind_t<void, boost::_mfi
    #16 0x456eb4 in boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio::detail::transfer_at_least_t, boost::
    #17 0x456dd6 in void boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boost::asio
    #18 0x456d3a in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, bo
    #19 0x456a6e in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, boo
    #20 0x4565ed in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, bo
    #21 0x455cdb in boost::asio::detail::reactive_socket_recv_op<boost::asio::mutable_buffers_1, boost::asio::detail::read_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp> >, boost::asio::mutable_buffers_1, b
    #22 0x426f4b in boost::asio::detail::task_io_service_operation::complete(boost::asio::detail::task_io_service&, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/task_io_service_operation.hpp:38
    #23 0x432dd5 in boost::asio::detail::epoll_reactor::descriptor_state::do_complete(boost::asio::detail::task_io_service*, boost::asio::detail::task_io_service_operation*, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/impl/epol
    #24 0x426f4b in boost::asio::detail::task_io_service_operation::complete(boost::asio::detail::task_io_service&, boost::system::error_code const&, unsigned long) /home/sehe/custom/boost/boost/asio/detail/task_io_service_operation.hpp:38
    #25 0x4373af in boost::asio::detail::task_io_service::do_run_one(boost::asio::detail::scoped_lock<boost::asio::detail::posix_mutex>&, boost::asio::detail::task_io_service_thread_info&, boost::system::error_code const&) /home/sehe/custom/boost/boost/asio/detail/impl/task
    #26 0x435739 in boost::asio::detail::task_io_service::run(boost::system::error_code&) /home/sehe/custom/boost/boost/asio/detail/impl/task_io_service.ipp:149
    #27 0x438804 in boost::asio::io_service::run() /home/sehe/custom/boost/boost/asio/impl/io_service.ipp:59
    #28 0x42024e in main /home/sehe/Projects/stackoverflow/test.cpp:102
    #29 0x7ff14f09d82f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x2082f)

SUMMARY: AddressSanitizer: 1056 byte(s) leaked in 2 allocation(s).
Run Code Online (Sandbox Code Playgroud)

修复内存分配

如果需要,只需使用unique_ptr移动所有权:

#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <mutex>
#include <iostream>
#include <iomanip>

#define READ_BUFFER_SIZE 1024u

namespace Utils {
    void print(char const* data, size_t n) {
        for (auto it=data; it <data+n; ++it) {
            int v = static_cast<uint8_t>(*it);
            std::cout << std::hex << std::setw(2) << std::setfill('0') << v << " ";
        }
        std::cout << "\n";
    }
}

struct SerializedBuffer {
    std::vector<char> _buf;
    size_t _pos = 0;

    size_t position() const { return _pos; }
    void position(size_t n) { _pos = n; }
    void rewind() { position(0); }
    char* bytes() { return _buf.data(); }

    size_t limit() { return _buf.size(); }
    void limit(size_t n) { _buf.resize(n); }

    SerializedBuffer(size_t n) : _buf(n) {}
};

struct client : boost::enable_shared_from_this<client> {
    client(boost::asio::io_service& svc) : _svc(svc), socket(_svc) {
        socket.connect({boost::asio::ip::address{}, 6767});
    }
    void doRead();

    void onRead(boost::system::error_code err, size_t nbytes);

    void stop() {
        socket.get_io_service().stop();
    }

    void onReceivedData(boost::shared_ptr<client> This, std::unique_ptr<SerializedBuffer> sb) {
        std::cout << __PRETTY_FUNCTION__ << ": ";
        Utils::print(sb->bytes(), sb->limit());

        doRead();
    }

  private:
    std::unique_ptr<SerializedBuffer> readBuffer;
    boost::asio::io_service& _svc;
    boost::asio::ip::tcp::socket socket;
};

void client::doRead() {
    readBuffer.reset(new SerializedBuffer(READ_BUFFER_SIZE));
    boost::asio::async_read(socket, boost::asio::buffer(readBuffer->bytes(), READ_BUFFER_SIZE),
            boost::asio::transfer_at_least(1),
            boost::bind(&client::onRead, shared_from_this(), _1, _2));
}

void client::onRead(boost::system::error_code err, size_t nbytes) {
    if (err) {
        stop();
        return;
    }

    readBuffer->rewind();
    readBuffer->limit(nbytes);

    onReceivedData(shared_from_this(), std::move(readBuffer));
}

int main() {
    boost::asio::io_service svc;

    auto c = boost::make_shared<client>(svc);
    c->doRead();

    svc.run();
}
Run Code Online (Sandbox Code Playgroud)

打印:

$ ./sotest 
void client::onReceivedData(boost::shared_ptr<client>, std::unique_ptr<SerializedBuffer>): 38 0a fa 6f 73 2e 2e 2e 2e be bb 0a 
Run Code Online (Sandbox Code Playgroud)

要么

$ ./sotest 
void client::onReceivedData(boost::shared_ptr<client>, std::unique_ptr<SerializedBuffer>): 38 0a 
void client::onReceivedData(boost::shared_ptr<client>, std::unique_ptr<SerializedBuffer>): fa 6f 73 2e 2e 2e 2e be bb 0a 
Run Code Online (Sandbox Code Playgroud)

取决于时间.

  • @VladMarkushin我/我/伤心,因为你没有意识到这是无法提供帮助的原因.你为自己没有做任何新事物感到自豪.但问题是,您没有显示产生问题的部分.是的,所有的代码味道都让我有充分的理由认为你确实在其他地方创建了UB(/sf/ask/3164880351/#comment77397477_45212576 ).你显示我正在以不同方式编写的代码.然后我们可以帮忙. (5认同)