如何使用多线程仅连接一次从互联网读取数据?

Soh*_*oha 3 boost-asio c++11

我正在使用 boost::asio::ip::tcp 构建一个小型多线程下载程序。我需要每个线程处理一部分数据。我知道它可以通过在请求标头中添加“Range:bytes:xx-xx”来解决问题。但我不想让程序连接服务器这么多次。有什么解决办法吗?

seh*_*ehe 5

只需阅读它并在适当的时候分派给工作线程即可。

\n

由于不知道您想要单独处理哪种块,我们假设您从https://www.mathsisfun.com/includes/primes-to-100k.zip读取了所有素数,将它们读入块,然后在单独的线程上对所有素数进行一些处理。

\n

工作是什么?

\n

这是一些懒惰的主要工作:

\n
void handle_batch(std::vector<size_t> params) {\n    if (!params.empty()) {\n        std::cout\n            << "Batch n:" << params.size()\n            << "\\tRange [" << params.front() << ".." << params.back() << "]"\n            << "\\tSum:" << std::accumulate(begin(params), end(params), 0ull)\n            << std::endl;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

是的,我们只是打印作业参数及其总和的描述。我们可以在它上面涂鸦一些,让它更逼真,比如让它需要一些时间,并意识到我们处于工作线程上,所以我们想要同步对控制台的访问。

\n
void handle_batch(std::vector<size_t> params) {\n    static std::mutex s_mx;\n\n    if (!params.empty()) {\n        // emulate some work, because I\'m lazy\n        auto sum = std::accumulate(begin(params), end(params), 0ull);\n        // then wait some 100..200ms\n        {\n            using namespace std::chrono_literals;\n            std::mt19937 prng(std::random_device{}());\n            std::this_thread::sleep_for(\n                std::uniform_real_distribution<>(100,200)(prng)*1ms);\n        }\n\n        // simple thread id (thread::id displays ugly)\n        auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;\n\n        // report results to stdout\n        std::lock_guard lk(s_mx); // make sure the output doesn\'t intermix\n        std::cout\n            << "Thread #" << std::setw(2) << std::setfill(\'0\') << tid\n            << " Batch n:" << params.size()\n            << "\\tRange [" << params.front() << ".." << params.back() << "]"\n            << "\\tSum:" << sum\n            << std::endl;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

好吧,对于不重要的部分来说已经足够血腥了。

\n

计划

\n

嗯,我选择的方法有点复杂,因为该网站不仅使用 https(呃),而且还提供 ZIP 文件(呃)。我们正在使用 C++(呃?)。

\n

至少,我们可以用不多的代码同步完成整个 SSL 连接业务,不过我们希望读取是异步的,因为这样我们就可以证明这一点

\n
    \n
  • 您可以使用 Boost Asio 在主线程上执行大量混合 IO
  • \n
  • zcatBoost Process 作为子进程启动以解压缩 primes 内容也是如此(我们假设已zcat安装类似 UNIX 的系统)
  • \n
  • 这意味着我们将异步写入该子进程的 stdin
  • \n
  • 并且还从其标准输出异步读取
  • \n
  • 一旦批处理作业准备就绪,就会立即生成它们
  • \n
\n

对于您的工作负载来说,这应该是一个非常好的模型,因为工作线程比 IO 花费更多的时间,但是,我们在单个线程上执行许多 IO 任务而不会阻塞。

\n

让我们获取数据

\n

如前所述,我们将使用单线程进行 IO,并使用线程池进行批处理工作:

\n
int main() {\n    net::io_context io; // main thread does all io\n    net::thread_pool pool(6); // worker threads\n
Run Code Online (Sandbox Code Playgroud)\n

那里。这就是一个开始。现在,我们想要建立 SSL 连接,并请求该 ZIP。这里是:

\n
http::response_parser<http::buffer_body> res_reader;\nbeast::flat_buffer lookahead; // for the res_reader\nstd::array<char,512> buf{0}; // for download content\nauto ctx = ssl_context();\nssl::stream<tcp::socket> s(io, ctx);\n\n{   // synchronously write request\n    std::string host = "www.mathsisfun.com";\n    connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));\n    http::write(s, get_request(host, "/includes/primes-to-100k.zip"));\n\n    http::read_header(s, lookahead, res_reader);\n    //std::cerr << "Headers: " << res_reader.get().base() << std::endl;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

是的,已经读取了响应头\xc2\xb9。当然,我们作弊是因为我们需要三个助手:

\n
    \n
  1. 创建 ssl 上下文

    \n
    auto ssl_context() {\n    ssl::context ctx{ssl::context::sslv23};\n    ctx.set_default_verify_paths();\n    ctx.set_verify_mode(ssl::verify_peer);\n    return ctx;\n}\n
    Run Code Online (Sandbox Code Playgroud)\n
  2. \n
  3. 通过 SSL 连接

    \n
    void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {\n    net::connect(s.lowest_layer(), eps);\n    s.lowest_layer().set_option(tcp::no_delay(true));\n\n    if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {\n        throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };\n    }\n    s.handshake(stream::handshake_type::client);\n}\n
    Run Code Online (Sandbox Code Playgroud)\n
  4. \n
  5. 发出 HTTP 请求

    \n
    auto get_request(std::string const& host, std::string const& path) {\n    using namespace http;\n    request<string_body> req;\n    req.version(11);\n    req.method(verb::get);\n    req.target("https://" + host + path);\n    req.set(field::user_agent, "test");\n    req.set(field::host, host);\n\n    std::cerr << req << std::endl;\n    return req;\n}\n
    Run Code Online (Sandbox Code Playgroud)\n
  6. \n
\n

对于 C++ 来说还不错。

\n

通过管道将其输入zcat

\n

现在我们从异步开始:让我们有一个“泵”或“循环”,将所有响应数据发送到管道中:

\n
// now, asynchoronusly read contents\nprocess::async_pipe pipe_to_zcat(io);\n\nstd::function<void(error_code, size_t)> receive_zip;\n
Run Code Online (Sandbox Code Playgroud)\n

receive_zip就是我们所说的循环。这是一个自链异步操作。因此,每次调用它时,它都会将一些数据泵入管道中,并再次调用一个async_read以获取 HTTP 响应:

\n
receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip]\n    (error_code ec, size_t /*ignore_this*/)\n{\n    auto& res = response_reader.get();\n    auto& body = res.body();\n    if (body.data) {\n        auto n = sizeof(buf) - body.size;\n        net::write(pipe_to_zcat, net::buffer(buf, n));\n    }\n\n    bool done = ec && !(ec == http::error::need_buffer);\n    done += response_reader.is_done();\n\n    if (done) {\n        std::cerr << "receive_zip: " << ec.message() << std::endl;\n        pipe_to_zcat.close();\n    } else {\n        body.data = buf.data();\n        body.size = buf.size();\n\n        http::async_read(s, lookahead, response_reader, receive_zip);\n    }\n};\n
Run Code Online (Sandbox Code Playgroud)\n
\n

这个看起来有点复杂的缓冲响应的读取几乎是从这里的文档中直接读取的。

\n
\n

现在,我们所要做的就是启动泵

\n
// kick off receive loop\nreceive_zip(error_code{}, 0);\n
Run Code Online (Sandbox Code Playgroud)\n

间奏曲,解压

\n

这不是有趣的部分,让我们开始吧:我们正在启动一个子进程zcat,并希望第二个管道读取以下输出:

\n
process::async_pipe zcat_output(io);\nprocess::child zcat(\n   process::search_path("zcat"),\n   process::std_in < pipe_to_zcat,\n   process::std_out > zcat_output,\n   process::on_exit([](int exitcode, std::error_code ec) {\n        std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\\n";\n   }), io);\n
Run Code Online (Sandbox Code Playgroud)\n

中场休息结束:)

\n

(我们甚至加入了错误报告,因为为什么不呢?)

\n

啊,好东西:Prime On Tap!

\n

现在,我们有另一个异步读取循环,这次是读回未压缩的素数。我们将在这里组装要在工作池上处理的批处理作业。

\n
std::function<void(error_code, size_t)> receive_primes;\nnet::streambuf sb;\n
Run Code Online (Sandbox Code Playgroud)\n

receive_zip之前一样,receive_primes我们的循环驱动程序,sb缓冲区只是一个缓冲区,可以std::istream像通常从std::cin.

\n
receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {\n    {\n        std::istream is(&sb);\n\n        size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), \'\\n\');\n        std::vector<size_t> batch(n);\n        std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());\n        is.ignore(1, \'\\n\'); // we know a newline is pending, eat it to keep invariant\n\n        post(pool, std::bind(handle_batch, std::move(batch)));\n    }\n\n    if (ec) {\n        std::cerr << "receive_primes: " << ec.message() << std::endl;\n        zcat_output.close();\n    } else {\n        net::async_read_until(zcat_output, sb, "\\n", receive_primes);\n    }\n};\n
Run Code Online (Sandbox Code Playgroud)\n

因为async_read_until可能会读取部分行,所以我们计算n缓冲区中完整行的数量 ( ) 并将它们打包到一个向量中。在我们确保吃掉即将到来的换行符之后,我们......最后发布批处理作业:

\n
 post(pool, std::bind(handle_batch, std::move(batch)));\n
Run Code Online (Sandbox Code Playgroud)\n
\n

我们将所有权转移给任务,因为它将在单独的线程上运行,而处理并发的最佳方法是尽量减少共享。

\n
\n

再次启动泵:

\n
// kick off handler loop as well:\nreceive_primes(error_code{}, 0);\n
Run Code Online (Sandbox Code Playgroud)\n

将所有内容放在一起

\n

出色地。为虎头蛇尾做好准备。完成所有异步链设置后,我们需要做的就是......等待。

\n
    io.run();\n    pool.join();\n} // end of main\n
Run Code Online (Sandbox Code Playgroud)\n

正如我们所希望的那样,它io.run()会继续运行两个泵并等待子进程,所有这些都在主线程上。

\n

在停止线程池之前,等待pool.join()所有批处理作业完成。如果省略该行,您可能无法运行所有任务,因为 的析构函数thread_poolstop()调用 之前调用join()

\n
\n

调整缓冲区大小(在我的示例中为 512 字节)以查看批量大小。请注意,512 字节是压缩字节。

\n
\n

“不现场”演示

\n

遗憾的是,据我所知,没有在线编译器支持外部网络访问,因此您必须自己运行这个编译器。为了方便起见,这里有完整的列表以及在我的计算机上运行的示例输出:

\n

住在科里鲁

\n
#include <boost/asio.hpp>\n#include <boost/asio/ssl.hpp>\n#include <boost/beast.hpp>\n#include <boost/beast/http.hpp>\n#include <boost/process.hpp>\n#include <boost/process/async.hpp>\n#include <iomanip>\n#include <iostream>\n\nvoid handle_batch(std::vector<size_t> params) {\n    static std::mutex s_mx;\n\n    if (!params.empty()) {\n        // emulate some work, because I\'m lazy\n        auto sum = std::accumulate(begin(params), end(params), 0ull);\n        // then wait some 100..200ms\n        {\n            using namespace std::chrono_literals;\n            std::mt19937 prng(std::random_device{}());\n            std::this_thread::sleep_for(\n                std::uniform_real_distribution<>(100,200)(prng)*1ms);\n        }\n\n        // simple thread id (thread::id displays ugly)\n        auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;\n\n        // report results to stdout\n        std::lock_guard lk(s_mx); // make sure the output doesn\'t intermix\n        std::cout\n            << "Thread #" << std::setw(2) << std::setfill(\'0\') << tid\n            << " Batch n:" << params.size()\n            << "\\tRange [" << params.front() << ".." << params.back() << "]"\n            << "\\tSum:" << sum\n            << std::endl;\n    }\n}\n\nnamespace net     = boost::asio;\nnamespace ssl     = net::ssl;\nnamespace beast   = boost::beast;\nnamespace http    = beast::http;\nnamespace process = boost::process;\n\nusing boost::system::error_code;\nusing boost::system::system_error;\nusing net::ip::tcp;\nusing stream = ssl::stream<tcp::socket>;\n\nauto ssl_context() {\n    ssl::context ctx{ssl::context::sslv23};\n    ctx.set_default_verify_paths();\n    ctx.set_verify_mode(ssl::verify_peer);\n    return ctx;\n}\n\nvoid connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {\n    net::connect(s.lowest_layer(), eps);\n    s.lowest_layer().set_option(tcp::no_delay(true));\n\n    if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {\n        throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };\n    }\n    s.handshake(stream::handshake_type::client);\n}\n\nauto get_request(std::string const& host, std::string const& path) {\n    using namespace http;\n    request<string_body> req;\n    req.version(11);\n    req.method(verb::get);\n    req.target("https://" + host + path);\n    req.set(field::user_agent, "test");\n    req.set(field::host, host);\n\n    std::cerr << req << std::endl;\n    return req;\n}\n\nint main() {\n    net::io_context io; // main thread does all io\n    net::thread_pool pool(6); // worker threads\n\n    // outside for lifetime\n    http::response_parser<http::buffer_body> response_reader;\n    beast::flat_buffer lookahead; // for the response_reader\n    std::array<char,512> buf{0}; // for download content\n    auto ctx = ssl_context();\n    ssl::stream<tcp::socket> s(io, ctx);\n\n    {   // synchronously write request\n        std::string host = "www.mathsisfun.com";\n        connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));\n        http::write(s, get_request(host, "/includes/primes-to-100k.zip"));\n\n        http::read_header(s, lookahead, response_reader);\n        //std::cerr << "Headers: " << response_reader.get().base() << std::endl;\n    }\n\n    // now, asynchoronusly read contents\n    process::async_pipe pipe_to_zcat(io);\n\n    std::function<void(error_code, size_t)> receive_zip;\n    receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip](error_code ec, size_t /*ignore_this*/) {\n        auto& res = response_reader.get();\n        auto& body = res.body();\n        if (body.data) {\n            auto n = sizeof(buf) - body.size;\n            net::write(pipe_to_zcat, net::buffer(buf, n));\n        }\n\n        bool done = ec && !(ec == http::error::need_buffer);\n        done += response_reader.is_done();\n\n        if (done) {\n            std::cerr << "receive_zip: " << ec.message() << std::endl;\n            pipe_to_zcat.close();\n        } else {\n            body.data = buf.data();\n            body.size = buf.size();\n\n            http::async_read(s, lookahead, response_reader, receive_zip);\n        }\n    };\n\n    // kick off receive loop\n    receive_zip(error_code{}, 0);\n\n    process::async_pipe zcat_output(io);\n    process::child zcat(\n       process::search_path("zcat"),\n       process::std_in < pipe_to_zcat,\n       process::std_out > zcat_output,\n       process::on_exit([](int exitcode, std::error_code ec) {\n            std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\\n";\n       }), io);\n\n    std::function<void(error_code, size_t)> receive_primes;\n    net::streambuf sb;\n    receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {\n        {\n            std::istream is(&sb);\n\n            size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), \'\\n\');\n            std::vector<size_t> batch(n);\n            std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());\n            is.ignore(1, \'\\n\'); // we know a newline is pending, eat it to keep invariant\n\n            post(pool, std::bind(handle_batch, std::move(batch)));\n        }\n\n        if (ec) {\n            std::cerr << "receive_primes: " << ec.message() << std::endl;\n            zcat_output.close();\n        } else {\n            net::async_read_until(zcat_output, sb, "\\n", receive_primes);\n        }\n    };\n    // kick off handler loop as well:\n    receive_primes(error_code{}, 0);\n\n    io.run();\n    pool.join();\n}\n
Run Code Online (Sandbox Code Playgroud)\n

输出:

\n
GET https://www.mathsisfun.com/includes/primes-to-100k.zip HTTP/1.1\nUser-Agent: test\nHost: www.mathsisfun.com\n\n\nreceive_zip: Success\nChild process exited with 0 (Success)\nreceive_primes: End of file\nThread #11 Batch n:95   Range [599..1237]   Sum:86587\nThread #58 Batch n:170  Range [1249..2549]  Sum:320714\nThread #34 Batch n:170  Range [2551..3919]  Sum:549880\nThread #54 Batch n:170  Range [3923..5407]  Sum:790922\nThread #30 Batch n:170  Range [5413..6863]  Sum:1040712\nThread #60 Batch n:108  Range [2..593]  Sum:28697\nThread #58 Batch n:170  Range [8429..9923]  Sum:1560462\nThread #11 Batch n:170  Range [6869..8423]  Sum:1298732\nThread #30 Batch n:146  Range [12703..14087]    Sum:1956410\nThread #34 Batch n:147  Range [9929..11329] Sum:1563023\nThread #54 Batch n:146  Range [11351..12697]    Sum:1758964\nThread #60 Batch n:146  Range [14107..15473]    Sum:2164462\nThread #11 Batch n:146  Range [16943..18313]    Sum:2576764\nThread #34 Batch n:146  Range [19861..21313]    Sum:3003048\nThread #30 Batch n:146  Range [18329..19853]    Sum:2790654\nThread #58 Batch n:146  Range [15493..16937]    Sum:2365198\nThread #60 Batch n:146  Range [22721..24109]    Sum:3422310\nThread #54 Batch n:146  Range [21317..22717]    Sum:3212180\nThread #30 Batch n:146  Range [27179..28661]    Sum:4081540\nThread #11 Batch n:146  Range [24113..25693]    Sum:3640476\nThread #34 Batch n:146  Range [25703..27143]    Sum:3859484\nThread #60 Batch n:146  Range [30223..31741]    Sum:4525378\nThread #54 Batch n:146  Range [31751..33211]    Sum:4746372\nThread #58 Batch n:146  Range [28663..30211]    Sum:4297314\nThread #30 Batch n:146  Range [33223..34693]    Sum:4958972\nThread #34 Batch n:146  Range [36307..37799]    Sum:5408028\nThread #11 Batch n:146  Range [34703..36299]    Sum:5184000\nThread #54 Batch n:146  Range [39371..40973]    Sum:5865356\nThread #60 Batch n:146  Range [37811..39367]    Sum:5637612\nThread #58 Batch n:146  Range [40993..42433]    Sum:6091022\nThread #34 Batch n:146  Range [44029..45613]    Sum:6541984\nThread #54 Batch n:146  Range [47287..48817]    Sum:7013764\nThread #30 Batch n:146  Range [42437..44027]    Sum:6308156\nThread #11 Batch n:146  Range [45631..47279]    Sum:6780582\nThread #58 Batch n:146  Range [50341..51913]    Sum:7470486\nThread #34 Batch n:146  Range [51929..53569]    Sum:7701048\nThread #60 Batch n:146  Range [48821..50333]    Sum:7239008\nThread #54 Batch n:146  Range [53591..55147]    Sum:7934798\nThread #11 Batch n:146  Range [56713..58211]    Sum:8388956\nThread #58 Batch n:146  Range [58217..59771]    Sum:8617316\nThread #30 Batch n:146  Range [55163..56711]    Sum:8169020\nThread #60 Batch n:146  Range [61519..63197]    Sum:9100594\nThread #34 Batch n:146  Range [59779..61511]    Sum:8856806\nThread #54 Batch n:146  Range [63199..64849]    Sum:9339328\nThread #11 Batch n:146  Range [64853..66457]    Sum:9580694\nThread #58 Batch n:146  Range [66463..67979]    Sum:9816826\nThread #30 Batch n:146  Range [67987..69779]    Sum:10057662\nThread #54 Batch n:146  Range [72931..74573]    Sum:10770902\nThread #34 Batch n:146  Range [71347..72923]    Sum:10529702\nThread #60 Batch n:146  Range [69809..71341]    Sum:10304156\nThread #11 Batch n:146  Range [74587..76231]    Sum:11008056\nThread #58 Batch n:146  Range [76243..77801]    Sum:11251048\nThread #30 Batch n:146  Range [77813..79561]    Sum:11491034\nThread #34 Batch n:146  Range [81119..82729]    Sum:11963076\nThread #60 Batch n:146  Range [82757..84449]    Sum:12207776\nThread #58 Batch n:146  Range [86183..87767]    Sum:12700772\nThread #54 Batch n:146  Range [79579..81101]    Sum:11732042\nThread #11 Batch n:146  Range [84457..86179]    Sum:12455242\nThread #30 Batch n:146  Range [87793..89527]    Sum:12951322\nThread #34 Batch n:146  Range [89533..91153]    Sum:13187046\nThread #54 Batch n:146  Range [94441..96013]    Sum:13904802\nThread #30 Batch n:146  Range [97829..99487]    Sum:14403556\nThread #58 Batch n:146  Range [92779..94439]    Sum:13665032\nThread #60 Batch n:146  Range [91159..92767]    Sum:13431876\nThread #11 Batch n:146  Range [96017..97813]    Sum:14148718\nThread #34 Batch n:46   Range [99497..99991]    Sum:4588078\n
Run Code Online (Sandbox Code Playgroud)\n
\n

\xc2\xb9 您可以通过取消注释该行来打印它。请注意,Boost 1.70 没有实现流式传输,b1.72 有一个关于 boost::process::async_pipe 的错误,因此您需要 1.73 才能实际打印这样的标头。

\n