使用 boost.process 同时读取和写入子级的 stdio

jag*_*jun 1 c++ windows boost boost-asio boost-process

我正在尝试使用 boost.process 写入和读取子级的 stdio,如下所示:

boost::asio::io_service writeService, readService;
bp::async_pipe in{writeService};
bp::async_pipe out{readService};

bp::child process(CompressCmd.c_str(), bp::std_in < in, bp::std_out > out);
Buffer src;
src.reserve(4 * 1024 * 1024);
integer_type read = 0;
//std::atomic_int64_t totalWrite{0};
integer_type totalWrite = 0;
while (callback(CallbackActions::NeedMoreInput, src, read)) {
    in.async_write_some(
        boost::asio::buffer(src.data(), read),
        [](const boost::system::error_code &e, std::size_t) { });
    // written data is not important, that's why using same buffer
    out.async_read_some(boost::asio::buffer(src.data(), src.capacity()),
                        [&](const boost::system::error_code &e,
                           std::size_t byte_transferred) { totalWrite += byte_transferred; });
}
writeService.run();
in.close();
readService.run();
Run Code Online (Sandbox Code Playgroud)

所有读写操作都被通知为成功,但totalWrite的值完全不正确,报告为29356032,实际值应该约为50000000
我注意到程序正在中途终止,
在readService.run()冻结子进程后使用process.wait(),
使用atomic int 会产生相同的行为,

现在我实际上只需要知道实际写入了多少数据,这就是我使用相同缓冲区的原因

seh*_*ehe 6

    \n
  1. 这个模式:

    \n\n
    while (callback(CallbackActions::NeedMoreInput, src, read)) {\n    in.async_write_some(...);\n    out.async_read_some(...);\n}\n
    Run Code Online (Sandbox Code Playgroud)\n\n

    很可能被误导(异步操作总是立即返回,因此您只需继续添加更多异步操作而不给它们运行的​​机会)。

  2. \n
  3. 同样误导的事实是,您为管道提供了单独的服务,但您在完全排除的情况下运行它们,因此在 writeService 完成之前不会运行任何读取操作。

  4. \n
  5. atomic类型被误导,因为无法从多个线程进行访问

  6. \n
  7. 你想做什么?您保留了一个大缓冲区,但从未将任何数据放入其中(reserve!= resize)。因此你只能希望什么也不写。

    \n\n

    更讽刺的是,您正在完全相同的位置读取完全相同的缓冲区。然而,这立即是未定义的行为\xc2\xb9,因为src.capacity()当你知道的时候你就通过了它src.size()==0

    \n\n

    即使没有该错误,您如何“同时”从内存中完全相同的字节读取和写入,并且仍然知道预期结果是什么?

  8. \n
  9. 你没有将自己的传递io_service给Boost Process

  10. \n
\n\n

一个工作演示

\n\n

这是一个工作示例。当然,我必须猜测你到底想做什么。

\n\n

我选择让程序将其自己的源代码 (main.cpp) 发送到 stdin,并迭代读取 stdout,记录字节total_received。然后它会打印退出代码和总数。

\n\n

我使用它作为临时压缩器,\'/usr/bin/xxd\'因为它可用,甚至可以有效地打印std::cout以进行调试。

\n\n

Live On Coliru// Coliru 遇到麻烦

\n\n
#include <boost/asio.hpp>\n#include <boost/process.hpp>\n#include <boost/process/async.hpp>\n#include <iostream>\nstd::vector<char> read_file(std::string const&);\n\nnamespace bp = boost::process;\nusing boost::system::error_code;\n\nusing Loop = boost::function<void()>;\nusing Buffer = std::array<char, 4*1024>;\n\nint main() {\n    boost::asio::io_service svc;\n\n    std::string const CompressCmd = "/usr/bin/xxd";\n\n    bp::async_pipe in{svc}, out{svc};\n    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);\n\n    auto data = read_file("main.cpp");\n\n    Loop read_loop, write_loop;\n\n    Buffer recv_buffer;\n    std::size_t total_received = 0;\n    read_loop = [&read_loop, &out, &recv_buffer, &total_received] {\n        out.async_read_some(boost::asio::buffer(recv_buffer),\n            [&](error_code ec, size_t transferred) {\n                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\\n";\n                total_received += transferred; \n                if (!ec)\n                    read_loop(); // continue reading\n            });\n    };\n\n    boost::asio::async_write(in, boost::asio::buffer(data),\n        [&](error_code ec, size_t transferred) {\n            std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\\n";\n            in.close(ec);\n            std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\\n";\n        }); // async\n\n    read_loop(); // async\n\n    svc.run(); // Await all async operations\n\n    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\\n";\n}\n\n#include <fstream>\n#include <iterator>\nstd::vector<char> read_file(std::string const& fname) {\n    std::ifstream ifs(fname);\n    return {std::istreambuf_iterator<char>(ifs), {}};\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

印刷

\n\n
WriteLoop: Success done, 1787 bytes\nWriteLoop: closed pipe (Success)\nReadLoop: Success got 4096 bytes\nReadLoop: Success got 3515 bytes\nReadLoop: End of file got 0 bytes\nProcess exitcode 0, total_received=7611\n
Run Code Online (Sandbox Code Playgroud)\n\n

解释、简化

\n\n

请注意,我们在没有循环的情况下完成所有编写。那是因为这boost::asio::async_write是一个组合操作(它隐藏了循环)。

\n\n

同样,如果您可以“负担”将整个接收到的数据存储在内存中,则可以通过使用boost::asio::streambuf和使用类似的组合操作来简化:

\n\n

Live On Coliru// Coliru 遇到麻烦

\n\n
#include <boost/asio.hpp>\n#include <boost/process.hpp>\n#include <boost/process/async.hpp>\n#include <iostream>\nstd::vector<char> read_file(std::string const&);\n\nnamespace bp = boost::process;\nusing boost::system::error_code;\n\nint main() {\n    boost::asio::io_service svc;\n\n    std::string const CompressCmd = "/usr/bin/xxd";\n\n    bp::async_pipe in{svc}, out{svc};\n    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);\n\n    auto data = read_file("main.cpp");\n\n    boost::asio::streambuf recv_buffer;\n    boost::asio::async_read(out, recv_buffer,\n            [&](error_code ec, size_t transferred) {\n                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\\n";\n            });\n\n    boost::asio::async_write(in, boost::asio::buffer(data),\n        [&](error_code ec, size_t transferred) {\n            std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\\n";\n            in.close(ec);\n            std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\\n";\n        }); // async\n\n    svc.run(); // Await all async operations\n\n    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << recv_buffer.size() << "\\n";\n}\n\n#include <fstream>\n#include <iterator>\nstd::vector<char> read_file(std::string const& fname) {\n    std::ifstream ifs(fname);\n    return {std::istreambuf_iterator<char>(ifs), {}};\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n

相反,如果您无法在发送之前将所有数据存储在内存中,则可以创建一个循环来按块发送输入

\n
\n\n

两个带延迟的异步循环

\n\n

让我们这样做,并在编写每个块之前延迟一秒钟,使其变得更有趣。您期望看到的是由于延迟而发生的交替读/写:

\n\n

Live On Coliru// 耶,在 Coliru 上运行

\n\n
#include <boost/asio.hpp>\n#include <boost/asio/high_resolution_timer.hpp>\n#include <boost/process.hpp>\n#include <boost/process/async.hpp>\n#include <iostream>\n#include <fstream>\n\nnamespace bp = boost::process;\nusing boost::system::error_code;\nusing namespace std::chrono_literals;\n\nusing Loop = boost::function<void()>;\nusing Buffer = std::array<char, 500>;\n\nint main() {\n    boost::asio::io_service svc;\n    auto on_exit = [](int code, std::error_code ec) {\n            std::cout << "Exited " << code << " (" << ec.message() << ")\\n";\n        };\n\n    std::string const CompressCmd = "/usr/bin/xxd";\n\n    bp::async_pipe in{svc}, out{svc};\n    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc, bp::on_exit(on_exit));\n\n    Loop read_loop, write_loop;\n\n    Buffer recv_buffer;\n    std::size_t total_received = 0;\n    read_loop = [&read_loop, &out, &recv_buffer, &total_received] {\n        out.async_read_some(boost::asio::buffer(recv_buffer),\n            [&](error_code ec, size_t transferred) {\n                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\\n";\n                total_received += transferred; \n                if (!ec)\n                    read_loop(); // continue reading\n            });\n    };\n\n    std::ifstream ifs("main.cpp");\n    std::size_t total_written = 0;\n    Buffer send_buffer;\n    boost::asio::high_resolution_timer send_delay(svc);\n    write_loop = [&write_loop, &in, &ifs, &send_buffer, &total_written, &send_delay] {\n        if (!ifs.good())\n        {\n            error_code ec;\n            in.close(ec);\n            std::cout << "WriteLoop: closed stdin (" << ec.message() << ")\\n";\n            return;\n        }\n        ifs.read(send_buffer.data(), send_buffer.size());\n\n        boost::asio::async_write(in, boost::asio::buffer(send_buffer.data(), ifs.gcount()),\n            [&](error_code ec, size_t transferred) {\n                std::cout << "WriteLoop: " << ec.message() << " sent " << transferred << " bytes\\n";\n                total_written += transferred; \n                if (!ec) {\n                    send_delay.expires_from_now(1s);\n                    send_delay.async_wait([&write_loop](error_code ec) {\n                        std::cout << "WriteLoop: send delay " << ec.message() << "\\n";\n                        if (!ec) write_loop(); // continue writing\n                    });\n                }\n            });\n    };\n\n    read_loop(); // async\n    write_loop(); // async\n\n    svc.run(); // Await all async operations\n\n    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\\n";\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

印刷

\n\n
WriteLoop: Success sent 500 bytes\nWriteLoop: send delay Success\nWriteLoop: Success sent 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 96 bytes\nWriteLoop: send delay Success\nWriteLoop: Success sent 500 bytes\nWriteLoop: send delay Success\nWriteLoop: Success sent 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 96 bytes\nWriteLoop: send delay Success\nWriteLoop: Success sent 500 bytes\nWriteLoop: send delay Success\nWriteLoop: Success sent 134 bytes\nWriteLoop: send delay Success\nWriteLoop: closed stdin (Success)\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 500 bytes\nReadLoop: Success got 22 bytes\nExited 0 (Success)\nReadLoop: End of file got 0 bytes\nProcess exitcode 0, total_received=11214\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

\xc2\xb9 也许只是未指定,我现在不想找出差异

\n