c++ - Boost ASIO 网络服务器/客户端

hel*_*lix 4 c++ boost network-programming

我已经为客户端和服务器编写了一个程序。该程序当前执行以下操作:

  1. 服务器侦听连接的端点
  2. 客户端连接到服务器
  3. 服务器在接受连接时发送消息
  4. 客户端收到消息

我正在异步执行此操作。但是,问题是他们只能发送/接收一条消息。之后,他们就终止了。下面是我的代码:

#include <iostream>
#include <vector>
#include<boost/asio.hpp>

std::vector<char> buff(256);

void SendHandler(boost::system::error_code ex){
    std::cout << " do something here" << std::endl;
}

void ReadHandler(boost::system::error_code ex){
    std::cout << " print the buffer data..." << std::endl;
    std::cout << buff.data() << std::endl;

}

void Server(){
    boost::asio::io_service service;
    using namespace boost::asio::ip;
    tcp::endpoint endpoint(tcp::v4(), 4000);
    tcp::acceptor acceptor(service, endpoint); 
    tcp::socket socket(service);
    std::cout << "[Server] Waiting for connection" << std::endl;


    acceptor.accept(socket);
    std::cout << "[Server] Accepted a connection from client" << std::endl;

    std::string msg = "Message from server";
    socket.async_send(boost::asio::buffer(msg), SendHandler);
    service.run();

}

void Client(){
    boost::asio::io_service service;
    using namespace boost::asio::ip;
    tcp::endpoint endpoint(address::from_string("127.0.0.1"), 4000);
    tcp::socket socket(service);
    std::cout << "[Client] Connecting to server..." << std::endl;
    socket.connect(endpoint);
    std::cout << "[Client] Connection successful" << std::endl;

    socket.async_read_some(boost::asio::buffer(buff), ReadHandler);
    service.run();
}

int main(int argc, char **argv) {
    if(argc == 1){
        std::cout << "Please specify s for server or c for client" << std::endl;
        return -1;
    }
    if(argv[1][0] == 's'){
        Server();
    }
    else{
        Client();
    }
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

我想扩展这个程序,以便:

  1. 服务器可以监听,客户端可以无限期地发送请求。更像是一种单向聊天系统。
  2. 服务器能够连接到多个客户端。

async_send()and置于service.run()无限循环中并没有帮助。它只是在客户端一遍又一遍地打印消息,直到客户端终止。

我什至boost::asionetwork programming. 请告诉我应该在代码中修改哪些位置以及哪些内容?

小智 6

实现异步 TCP 服务器

\n

异步 TCP 服务器是满足以下条件的分布式应用程序的一部分:

\n
    \n
  • 在客户端-服务器通信模型中充当服务器
  • \n
  • 通过 TCP 协议与客户端应用程序进行通信
  • \n
  • 使用异步 I/O 和控制操作
  • \n
  • 可以同时处理多个客户
  • \n
\n

典型的异步 TCP 服务器按照以下算法工作:

\n
    \n
  • 分配一个接受器套接字并将其绑定到特定的 TCP 端口。
  • \n
  • 启动异步接受操作。
  • \n
  • 生成一个或多个控制线程并将它们添加到运行 Boost.Asio 事件循环的线程池中。
  • \n
  • 当异步接受操作完成后,发起新的接受下一个连接请求。
  • \n
  • 启动异步读取操作,读取连接客户端的请求。
  • \n
  • 当异步读取操作完成时,处理请求并准备响应消息。
  • \n
  • 发起异步写入操作,将响应消息发送给客户端。
  • \n
  • 当异步写入操作完成时,关闭连接并释放套接字。
  • \n
\n
//responsible for handling a single client by reading the request message, processing it, and then sending back the response message.\n//Each instance of the Service class is intended to handle one connected client\n//by reading the request message, processing it, and then sending the response message back.\nclass Service\n{\npublic:\n    //The class\'s constructor accepts a shared pointer to an object representing a socket connected to a particular client as an argument\n    // and caches this pointer. This socket will be used later to communicate with the client application.\n    Service(std::shared_ptr<asio::ip::tcp::socket> sock) : m_sock(sock)\n\n    //This method starts handling the client by initiating the asynchronous reading operation\n    //to read the request message from the client specifying the onRequestReceived() method as a callback.\n    void StartHandling()\n\nprivate:\n    void onRequestReceived(const boost::system::error_code &ec,\n                           std::size_t bytes_transferred)\n\n\n    void onResponseSent(const boost::system::error_code &ec,\n                        std::size_t bytes_transferred)\n\n    // Here we perform the cleanup.\n    void onFinish()\n    {\n        delete this;\n    }\n\n    //To keep things simple,  we implement a dummy service which only emulates the execution of certain operations\n    //The request processing emulation consists of performing many increment operations to emulate operations\n    //that intensively consume CPU and then putting the thread of control to sleep for some time to emulate I/O operations\n    std::string ProcessRequest(asio::streambuf &request)\n\nprivate:\n    std::shared_ptr<asio::ip::tcp::socket> m_sock;\n    std::string m_response;\n    asio::streambuf m_request;\n};\n\n//responsible for accepting the connection requests arriving from clients and instantiating the objects of the Service class,\n// which will provide the service to connected clients.\nclass Acceptor\n{\npublic:\n    //Its constructor accepts a port number on which it will listen for the incoming connection requests as its input argument. \n    Acceptor(asio::io_service &ios, unsigned short port_num) : m_ios(ios),\n                                                               //The object of this class contains an instance of the asio::ip::tcp::acceptor class as its member named m_acceptor,\n                                                               //which is constructed in the Acceptor class\'s constructor.\n                                                               m_acceptor(m_ios,\n                                                                          asio::ip::tcp::endpoint(\n                                                                              asio::ip::address_v4::any(),\n                                                                              port_num)),\n                                                               m_isStopped(false)\n\n    //The Start() method is intended to instruct an object of the Acceptor class to start listening and accepting incoming connection requests.\n    void Start()\n\n    // Stop accepting incoming connection requests.\n    void Stop()\n\n\nprivate:\n    void InitAccept()\n\n    void onAccept(const boost::system::error_code &ec,\n                  std::shared_ptr<asio::ip::tcp::socket> sock)\n\nprivate:\n    asio::io_service &m_ios;\n    //used to asynchronously accept the incoming connection requests.\n    asio::ip::tcp::acceptor m_acceptor;\n    std::atomic<bool> m_isStopped;\n};\n\n//represents the server itself\nclass Server\n{\npublic:\n    Server()\n\n    // Start the server.\n    // Accepts a protocol port number on which the server should listen for the incoming connection requests\n    // and the number of threads to add to the pool as input arguments and starts the server\n    // Nonblocking Method\n    void Start(unsigned short port_num,\n               unsigned int thread_pool_size)\n\n    // Stop the server.\n    // Blocks the caller thread until the server is stopped and all the threads running the event loop exit.\n    void Stop()\n\nprivate:\n    asio::io_service m_ios;\n    std::unique_ptr<asio::io_service::work> m_work;\n    std::unique_ptr<Acceptor> acc;\n    std::vector<std::unique_ptr<std::thread>> m_thread_pool;\n};\n\nint main()\n{\n    unsigned short port_num = 3333;\n\n    try\n    {\n        //it instantiates an object of the Server class named srv.\n        Server srv;\n\n        //before starting the server, the optimal size of the pool is calculated.\n        // The general formula often used in parallel applications to find the optimal number of threads is the number of processors the computer has multiplied by 2.\n        // We use the std::thread::hardware_concurrency() static method to obtain the number of processors. \n        unsigned int thread_pool_size =\n            std::thread::hardware_concurrency() * 2;\n\n        //because this method may fail to do its job returning 0,\n        // we fall back to default value represented by the constant DEFAULT_THREAD_POOL_SIZE, which is equal to 2 in our case.\n        if (thread_pool_size == 0)\n            thread_pool_size = DEFAULT_THREAD_POOL_SIZE;\n\n        srv.Start(port_num, thread_pool_size);\n\n        std::this_thread::sleep_for(std::chrono::seconds(60));\n\n        srv.Stop();\n    }\n    catch (system::system_error &e)\n    {\n        std::cout << "Error occured! Error code = "\n                  << e.code() << ". Message: "\n                  << e.what();\n    }\n\n    return 0;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

实现异步 TCP 客户端

\n

支持异步执行请求和请求取消功能的异步 TCP 客户端应用程序:

\n
    \n
  • 来自用户的输入应该在单独的线程\xe2\x80\x94(用户界面线程)中处理。该线程不应该被阻塞很长时间。
  • \n
  • 用户应该能够向不同的服务器发出多个请求。
  • \n
  • 用户应该能够在先前发出的请求完成之前发出新的请求。
  • \n
  • 用户应该能够在之前发出的请求完成之前取消它们。
  • \n
\n
// Function pointer type that points to the callback\n// function which is called when a request is complete.\n// Based on the values of the parameters passed to it, it outputs information about the finished request.\ntypedef void (*Callback)(unsigned int request_id,        // unique identifier of the request is assigned to the request when it was initiated.\n                         const std::string &response,    // the response data\n                         const system::error_code &ec);  // error information\n\n// data structure whose purpose is to keep the data related to a particular request while it is being executed\nstruct Session\n{\n    Session(asio::io_service &ios,\n            const std::string &raw_ip_address,\n            unsigned short port_num,\n            const std::string &request,\n            unsigned int id,\n            Callback callback) : m_sock(ios),\n                                 m_ep(asio::ip::address::from_string(raw_ip_address),\n                                      port_num),\n                                 m_request(request),\n                                 m_id(id),\n                                 m_callback(callback),\n                                 m_was_cancelled(false) {}\n\n    asio::ip::tcp::socket m_sock; // Socket used for communication\n    asio::ip::tcp::endpoint m_ep; // Remote endpoint.\n    std::string m_request;        // Request string.\n\n    // streambuf where the response will be stored.\n    asio::streambuf m_response_buf;\n    std::string m_response; // Response represented as a string.\n\n    // Contains the description of an error if one occurs during\n    // the request lifecycle.\n    system::error_code m_ec;\n\n    unsigned int m_id; // Unique ID assigned to the request.\n\n    // Pointer to the function to be called when the request\n    // completes.\n    Callback m_callback;\n\n    bool m_was_cancelled;\n    std::mutex m_cancel_guard;\n};\n\n// class that provides the asynchronous communication functionality.\nclass AsyncTCPClient : public boost::noncopyable\n{\npublic:\n    AsyncTCPClient(unsigned char num_of_threads)\n\n    // initiates a request to the server\n    void emulateLongComputationOp(\n        unsigned int duration_sec,          //represents the request parameter according to the application layer protocol\n        const std::string &raw_ip_address,  //specify the server to which the request should be sent.\n        unsigned short port_num,            //specify the server to which the request should be sent.\n        Callback callback,                  //callback function, which will be called when the request is complete.\n        unsigned int request_id)    // unique identifier of the request\n\n\n    // cancels the previously initiated request designated by the request_id argument\n    void cancelRequest(unsigned int request_id) //accepts an identifier of the request to be canceled as an argument.\n\n\n    // blocks the calling thread until all the currently running requests complete and deinitializes the client.\n    void close()\n\n\nprivate:\n    // method is called whenever the request completes with any result.\n    void onRequestComplete(std::shared_ptr<Session> session)\n\nprivate:\n    asio::io_service m_ios;\n    std::map<int, std::shared_ptr<Session>> m_active_sessions;\n    std::mutex m_active_sessions_guard;\n    std::unique_ptr<boost::asio::io_service::work> m_work;\n    std::list<std::unique_ptr<std::thread>> m_threads;\n};\n\n// a function that will serve as a callback, which we\'ll pass to the AsyncTCPClient::emulateLongComputationOp() method\n// It outputs the result of the request execution and the response message to the standard output stream if the request is completed successfully\nvoid handler(unsigned int request_id,\n             const std::string &response,\n             const system::error_code &ec)\n\nint main()\n{\n    try\n    {\n        AsyncTCPClient client(4);\n\n        // Here we emulate the user\'s behavior.\n\n        // creates an instance of the AsyncTCPClient class and then calls its emulateLongComputationOp() method to initiate three asynchronous requests\n        // User initiates a request with id 1.\n        client.emulateLongComputationOp(10, "127.0.0.1", 3333, handler, 1);\n\n        // Decides to exit the application.\n        client.close();\n    }\n    catch (system::system_error &e)\n    {\n        std::cout << "Error occured! Error code = " << e.code()\n                  << ". Message: " << e.what();\n\n        return e.code().value();\n    }\n\n    return 0;\n};\n
Run Code Online (Sandbox Code Playgroud)\n

设置环境

\n

1.安装CMake

\n
cd ~\nwget https://github.com/Kitware/CMake/releases/download/v3.14.5/cmake-3.14.5.tar.gz\ntar xf cmake-3.14.5.tar.gz\ncd cmake-3.14.5\n./bootstrap --parallel=10\nmake -j4\nsudo make -j4 install\n
Run Code Online (Sandbox Code Playgroud)\n

2.安装Boost

\n
cd ~\nwget https://boostorg.jfrog.io/artifactory/main/release/1.69.0/source/boost_1_69_0.tar.gz\ntar xf boost_1_69_0.tar.gz\ncd boost_1_69_0\n./bootstrap.sh\n./b2 ... cxxflags="-std=c++0x -stdlib=libc++" linkflags="-stdlib=libc++" ...\nsudo ./b2 toolset=gcc -j4 install\n
Run Code Online (Sandbox Code Playgroud)\n

如何建造

\n
mkdir build\ncd build\ncmake ..\ncmake --build .\n
Run Code Online (Sandbox Code Playgroud)\n

如何跑步

\n

运行服务器

\n
./bin/server\n\n
Run Code Online (Sandbox Code Playgroud)\n

我们可以检查服务器是否运行

\n
netstat -tulpn | grep LISTEN\n\ntcp        0      0 0.0.0.0:445             0.0.0.0:*               LISTEN      -\ntcp        0      0 0.0.0.0:3333            0.0.0.0:*               LISTEN      6866/./bin/server   <===============\ntcp        0      0 0.0.0.0:139             0.0.0.0:*               LISTEN      -\ntcp        0      0 127.0.0.53:53           0.0.0.0:*               LISTEN      -\ntcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      -\ntcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN      -\ntcp        0      0 127.0.0.1:5433          0.0.0.0:*               LISTEN      -\ntcp6       0      0 :::445                  :::*                    LISTEN      -\ntcp6       0      0 :::5000                 :::*                    LISTEN      -\ntcp6       0      0 :::5001                 :::*                    LISTEN      -\ntcp6       0      0 :::139                  :::*                    LISTEN      -\ntcp6       0      0 :::22                   :::*                    LISTEN      -\ntcp6       0      0 ::1:631                 :::*                    LISTEN      -\n
Run Code Online (Sandbox Code Playgroud)\n

运行客户端

\n
./bin/client\n\nRequest #1 has completed. Response: Response from server\n
Run Code Online (Sandbox Code Playgroud)\n

您可以在Boost Asio C++ Network Programming Client Server中找到该项目

\n