Boost ASIO:向所有连接的客户端发送消息

Fak*_*ame 3 c++ boost boost-asio

我正在开发一个涉及boost::beastwebsocket/http混合服务器的项目,该服务器运行在上面boost::asio.我的项目advanced_server.cpp源于示例源代码.

它工作正常,但现在我正在尝试添加一项功能,该功能需要向所有连接的客户端发送消息.

我不是很熟悉boost::asio,但是现在我看不到有任何方式可以播放"广播"事件(如果这是正确的术语).

我天真的方法是看看我是否可以构造websocket_session()附加类似事件监听器的东西,并且析构函数会分离监听器.此时,我可以触发事件,并使所有当前有效的websocket会话(websocket_session()作用域的生命周期)执行回调.

/sf/answers/1192031571/,它或多或少地做了我想要的(ab)使用a boost::asio::steady_timer,但这似乎是一种可怕的黑客来完成应该非常简单的事情.

基本上,给定一个有状态boost::asio服务器,我如何在多个连接上进行操作?

seh*_*ehe 7

首先:您可以广播UDP,但这不是连接客户端.那只是...... UDP.

其次,该链接显示了如何在Asio中使用条件变量(事件)类接口.这只是你问题的一小部分.你忘记了大局:你需要知道一组开放的连接,不管是这样:

  1. 例如,weak_ptr为每个连接保留一个会话指针容器()
  2. 每个连接订阅一个信号槽(例如Boost Signals).

选项1.非常适合性能,选项2更适合灵活性(将事件源与订户分离,使得异构订户成为可能,例如不是来自连接).

因为我认为选项1对于线程来说简单得多,效率更高(你可以例如从一个缓冲区为所有客户端提供服务而不复制)并且你可能不需要双重地解耦信号/插槽,让我参考一个答案在那里我已经展示了纯粹的Asio(没有Beast):

它显示了"连接池"的概念 - 它本质上是weak_ptr<connection>具有一些垃圾收集逻辑的对象的线程安全容器.

演示:介绍Echo Server

谈论了我想花时间实际展示这两种方法的事情之后,所以我完全清楚我在说什么.

首先让我们介绍一个简单的,随意的磨合异步TCP服务器

  • 具有多个并发连接
  • 每个连接的会话逐行从客户端读取,并将相同的回送回客户端
  • 3秒后停止接受,并在最后一个客户端断开连接后退出

master branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
    connection(ba::io_context& ioc) : _s(ioc) {}

    void start() { read_loop(); }
    void send(std::string msg, bool at_front = false) {
        post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
            if (enqueue(std::move(msg), at_front))
                write_loop();
        });
    }

  private:
    void do_echo() {
        std::string line;
        if (getline(std::istream(&_rx), line)) {
            send(std::move(line) + '\n');
        }
    }

    bool enqueue(std::string msg, bool at_front)
    { // returns true if need to start write loop
        at_front &= !_tx.empty(); // no difference
        if (at_front)
            _tx.insert(std::next(begin(_tx)), std::move(msg));
        else
            _tx.push_back(std::move(msg));

        return (_tx.size() == 1);
    }
    bool dequeue()
    { // returns true if more messages pending after dequeue
        assert(!_tx.empty());
        _tx.pop_front();
        return !_tx.empty();
    }

    void write_loop() {
        ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                if (!ec && dequeue()) write_loop();
            });
    }

    void read_loop() {
        ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                do_echo();
                if (!ec)
                    read_loop();
            });
    }

    friend struct server;
    ba::streambuf          _rx;
    std::list<std::string> _tx;
    tcp::socket            _s;
};

struct server {
    server(ba::io_context& ioc) : _ioc(ioc) {
        _acc.bind({{}, 6767});
        _acc.set_option(tcp::acceptor::reuse_address());
        _acc.listen();
        accept_loop();
    }

    void stop() {
        _ioc.post([=] {
                _acc.cancel();
                _acc.close();
            });
    }

  private:
    void accept_loop() {
        auto session = std::make_shared<connection>(_acc.get_io_context());
        _acc.async_accept(session->_s, [this,session](error_code ec) {
             auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
             std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

             session->start();
             if (!ec)
                 accept_loop();
        });
    }

    ba::io_context& _ioc;
    tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
    s_verbose = argc>1 && argv[1] == "-v"s;

    ba::io_context ioc;

    server s(ioc);

    std::thread th([&ioc] { ioc.run(); }); // todo exception handling

    std::this_thread::sleep_for(3s);
    s.stop(); // active connections will continue

    th.join();
}
Run Code Online (Sandbox Code Playgroud)

方法1.添加广播消息

所以,让我们添加同时发送到所有活动连接的"广播消息".我们加两个:

  • 每个新连接一个(说"播放器##已进入游戏")
  • 模仿全局"服务器事件"的问题,就像您在问题中描述的那样).它从main内部触发:

    std::this_thread::sleep_for(1s);
    
    auto n = s.broadcast("random global event broadcast\n");
    std::cout << "Global event broadcast reached " << n << " active connections\n";
    
    Run Code Online (Sandbox Code Playgroud)

注意我们如何通过向每个接受的连接注册一个弱指针并对每个连接进行操作来实现此目的:

    _acc.async_accept(session->_s, [this,session](error_code ec) {
         auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
         std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

         if (!ec) {
             auto n = reg_connection(session);

             session->start();
             accept_loop();

             broadcast("player #" + std::to_string(n) + " has entered the game\n");
         }

    });
Run Code Online (Sandbox Code Playgroud)

broadcast也直接使用main,简单地说:

size_t broadcast(std::string const& msg) {
    return for_each_active([msg](connection& c) { c.send(msg, true); });
}
Run Code Online (Sandbox Code Playgroud)

using-asio-post branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
    connection(ba::io_context& ioc) : _s(ioc) {}

    void start() { read_loop(); }
    void send(std::string msg, bool at_front = false) {
        post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
            if (enqueue(std::move(msg), at_front))
                write_loop();
        });
    }

  private:
    void do_echo() {
        std::string line;
        if (getline(std::istream(&_rx), line)) {
            send(std::move(line) + '\n');
        }
    }

    bool enqueue(std::string msg, bool at_front)
    { // returns true if need to start write loop
        at_front &= !_tx.empty(); // no difference
        if (at_front)
            _tx.insert(std::next(begin(_tx)), std::move(msg));
        else
            _tx.push_back(std::move(msg));

        return (_tx.size() == 1);
    }
    bool dequeue()
    { // returns true if more messages pending after dequeue
        assert(!_tx.empty());
        _tx.pop_front();
        return !_tx.empty();
    }

    void write_loop() {
        ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                if (!ec && dequeue()) write_loop();
            });
    }

    void read_loop() {
        ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                do_echo();
                if (!ec)
                    read_loop();
            });
    }

    friend struct server;
    ba::streambuf          _rx;
    std::list<std::string> _tx;
    tcp::socket            _s;
};

struct server {
    server(ba::io_context& ioc) : _ioc(ioc) {
        _acc.bind({{}, 6767});
        _acc.set_option(tcp::acceptor::reuse_address());
        _acc.listen();
        accept_loop();
    }

    void stop() {
        _ioc.post([=] {
                _acc.cancel();
                _acc.close();
            });
    }

    size_t broadcast(std::string const& msg) {
        return for_each_active([msg](connection& c) { c.send(msg, true); });
    }

  private:
    using connptr = std::shared_ptr<connection>;
    using weakptr = std::weak_ptr<connection>;

    std::mutex _mx;
    std::vector<weakptr> _registered;

    size_t reg_connection(weakptr wp) {
        std::lock_guard<std::mutex> lk(_mx);
        _registered.push_back(wp);
        return _registered.size();
    }

    template <typename F>
    size_t for_each_active(F f) {
        std::vector<connptr> active;
        {
            std::lock_guard<std::mutex> lk(_mx);
            for (auto& w : _registered)
                if (auto c = w.lock())
                    active.push_back(c);
        }

        for (auto& c : active) {
            std::cout << "(running action for " << c->_s.remote_endpoint() << ")" << std::endl;
            f(*c);
        }

        return active.size();
    }

    void accept_loop() {
        auto session = std::make_shared<connection>(_acc.get_io_context());
        _acc.async_accept(session->_s, [this,session](error_code ec) {
             auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
             std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

             if (!ec) {
                 auto n = reg_connection(session);

                 session->start();
                 accept_loop();

                 broadcast("player #" + std::to_string(n) + " has entered the game\n");
             }

        });
    }

    ba::io_context& _ioc;
    tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
    s_verbose = argc>1 && argv[1] == "-v"s;

    ba::io_context ioc;

    server s(ioc);

    std::thread th([&ioc] { ioc.run(); }); // todo exception handling

    std::this_thread::sleep_for(1s);

    auto n = s.broadcast("random global event broadcast\n");
    std::cout << "Global event broadcast reached " << n << " active connections\n";

    std::this_thread::sleep_for(2s);
    s.stop(); // active connections will continue

    th.join();
}
Run Code Online (Sandbox Code Playgroud)

方法2:那些广播但具有提升信号2

信号方法是依赖性反转的一个很好的例子.

最突出的笔记:

  • 在调用它的线程上调用信号槽("引发事件")
  • scoped_connection是有这么订阅*自动的删除时connection被破坏
  • 控制台消息的措辞与"达到#活动连接"到"已达到#活跃用户 " 的细微差别.

差异是理解增加灵活性的关键:信号所有者/调用者对订阅者一无所知.这就是我们正在谈论的解耦/依赖倒置

using-signals2 branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
#include <boost/signals2.hpp>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
    connection(ba::io_context& ioc) : _s(ioc) {}

    void start() { read_loop(); }
    void send(std::string msg, bool at_front = false) {
        post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
            if (enqueue(std::move(msg), at_front))
                write_loop();
        });
    }

  private:
    void do_echo() {
        std::string line;
        if (getline(std::istream(&_rx), line)) {
            send(std::move(line) + '\n');
        }
    }

    bool enqueue(std::string msg, bool at_front)
    { // returns true if need to start write loop
        at_front &= !_tx.empty(); // no difference
        if (at_front)
            _tx.insert(std::next(begin(_tx)), std::move(msg));
        else
            _tx.push_back(std::move(msg));

        return (_tx.size() == 1);
    }
    bool dequeue()
    { // returns true if more messages pending after dequeue
        assert(!_tx.empty());
        _tx.pop_front();
        return !_tx.empty();
    }

    void write_loop() {
        ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                if (!ec && dequeue()) write_loop();
            });
    }

    void read_loop() {
        ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                do_echo();
                if (!ec)
                    read_loop();
            });
    }

    friend struct server;
    ba::streambuf          _rx;
    std::list<std::string> _tx;
    tcp::socket            _s;

    boost::signals2::scoped_connection _subscription;
};

struct server {
    server(ba::io_context& ioc) : _ioc(ioc) {
        _acc.bind({{}, 6767});
        _acc.set_option(tcp::acceptor::reuse_address());
        _acc.listen();
        accept_loop();
    }

    void stop() {
        _ioc.post([=] {
                _acc.cancel();
                _acc.close();
            });
    }

    size_t broadcast(std::string const& msg) {
        _broadcast_event(msg);
        return _broadcast_event.num_slots();
    }

  private:
    boost::signals2::signal<void(std::string const& msg)> _broadcast_event;

    size_t reg_connection(connection& c) {
        c._subscription = _broadcast_event.connect(
                [&c](std::string msg){ c.send(msg, true); }
            );

        return _broadcast_event.num_slots();
    }

    void accept_loop() {
        auto session = std::make_shared<connection>(_acc.get_io_context());
        _acc.async_accept(session->_s, [this,session](error_code ec) {
             auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
             std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

             if (!ec) {
                 auto n = reg_connection(*session);

                 session->start();
                 accept_loop();

                 broadcast("player #" + std::to_string(n) + " has entered the game\n");
             }

        });
    }

    ba::io_context& _ioc;
    tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
    s_verbose = argc>1 && argv[1] == "-v"s;

    ba::io_context ioc;

    server s(ioc);

    std::thread th([&ioc] { ioc.run(); }); // todo exception handling

    std::this_thread::sleep_for(1s);

    auto n = s.broadcast("random global event broadcast\n");
    std::cout << "Global event broadcast reached " << n << " active subscribers\n";

    std::this_thread::sleep_for(2s);
    s.stop(); // active connections will continue

    th.join();
}
Run Code Online (Sandbox Code Playgroud)

见方法1和2之间的区别: Compare View on github

针对3个并发客户端运行时的输出示例:

(for a in {1..3}; do netcat localhost 6767 < /etc/dictionaries-common/words > echoed.$a& sleep .1; done; time wait)
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述


Vin*_*lco 5

@sehe 的回答非常棒,所以我就简单介绍一下。一般来说,要实现对所有活动连接进行操作的算法,您必须执行以下操作:

  • 维护活动连接列表。如果这个列表被多个线程访问,则需要同步( std::mutex)。新连接应插入到列表中,当连接被破坏或变为非活动状态时,应将其从列表中删除。

  • 要迭代列表,如果列表由多个线程访问(即多个线程调用asio::io_context::run,或者如果列表也从不调用 的线程访问asio::io_context::run) ,则需要同步

  • 在迭代期间,如果算法需要检查或修改任何连接的状态,并且该状态可以由其他线程更改,则需要额外的同步。这包括连接对象存储的任何内部消息“队列”。

  • 同步连接对象的一种简单方法是使用boost::asio::post提交一个函数以在连接对象的上下文上执行,该函数可以是显式链(boost::asio::strand如高级服务器示例中所示)或隐式链(当只有一个时您会得到什么)线程调用io_context::run)。@sehe 提供的方法 1 用于post以这种方式进行同步。

  • 同步连接对象的另一种方法是“停止世界”。这意味着调用io_context::stop,等待所有线程退出,然后保证没有其他线程正在访问连接列表。然后您可以根据需要读取和写入连接对象状态。当您完成连接列表后,调用io_context::restart并启动再次调用的线程io_context::run。停止io_context并不会停止网络活动,内核和网络驱动程序仍然从内部缓冲区发送和接收数据。TCP/IP 流量控制将处理所有事情,因此应用程序仍然可以顺利运行,即使它在“停止世界”期间短暂无响应。这种方法可以简化事情,但根据您的特定应用程序,您必须评估它是否适合您。

希望这可以帮助!