我如何利用boost :: packaged_task,函数参数和boost :: asio :: io_service?

Joh*_*orn 6 c++ multithreading boost boost-bind boost-asio

首先,我想为冗长的帖子道歉.我希望尽可能彻底.

我已经在这个问题上坚持了几天,并且关于正确使用boost::packaged_task具有输入参数的函数的信息很少令人惊讶.

系统信息

  • C++ 03
  • 提升1.54.0
  • CMake 2.8.9

初始要求

  1. 我有一个由客户端,服务器和设备组成的设置.
  2. 客户端通过向服务器发送请求来与设备交互.
    • 检查这些请求并将其路由到适当的设备.
    • 请求是异步处理的,并且boost::asio::io_service::strand由于各种原因偶尔会排队等候.
  3. 请求被放入设备本身的本地队列中.
    • 当请求已被确认(未完成)时,将为其分配一个ID,并返回给客户端.

打包任务

在查看boost :: futures之后,我们决定使用boost :: packaged_task完全符合我们的需要.但是,打包任务的实现似乎存在错误.

似乎packaged_task有几个不同的模板可供选择:

  1. packaged_task<R>
  2. packaged_task<R()>
  3. packaged_task<R(ArgTypes)>
  4. 其他我可能会失踪的人.

为了确保我正确使用该功能,我开始很简单; 使用boost :: futures页面上的简单示例作为起点.从那里,我创建了四个简单的函数:

  • int返回,没有参数.
  • int return,带参数.
  • std::string 返回,没有参数.
  • std::string 返回,带参数.

测试功能

std::string ans("forty two");

int int_no_params()
{
    return 42;
}

int int_with_params(int param)
{
    return param;
}

std::string string_no_params()
{
    return std::string("forty two");
}

std::string string_with_params(std::string & param) // Have tried both with and without '&'
{
    return param;
}
Run Code Online (Sandbox Code Playgroud)

例1:

int function(void)

    //! Compiles and produces correct result.  
    {
        boost::packaged_task<int()> example(int_no_params);
        boost::future<int> f = example.get_future();
        boost::thread task(boost::move(example));
        int answer = f.get();
        std::cout << "Answer to life and whatnot, in English: " << answer << std::endl;
        task.join();
    }
Run Code Online (Sandbox Code Playgroud)

例2:

std::string function(void)

    //! Compiles and produces correct result.
    {
        boost::packaged_task<std::string()> example(string_no_params);
        boost::future<std::string> f = example.get_future();
        boost::thread task(boost::move(example));
        std::string answer = f.get();
        std::cout << "string_no_params: " << answer << std::endl;
        task.join();
    }
Run Code Online (Sandbox Code Playgroud)

例3:

std::string(std::string& param) 没有线程

//! Doesn't compile.
//! error: variable ‘boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)> example’ has initializer but incomplete type

{
    boost::packaged_task<std::string(std::string&)> example(string_with_params);
    boost::future<std::string> f = example.get_future();
    example(ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
}
Run Code Online (Sandbox Code Playgroud)

例4:

使用boost :: threading

//! Doesn't compile.
//! error: variable ‘boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)> example’ has initializer but incomplete type
{
    boost::packaged_task<std::string(std::string&)> example(string_with_params);
    boost::future<std::string> f = example.get_future();
    boost::thread task(boost::move(example), ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
}
Run Code Online (Sandbox Code Playgroud)

例5:

在packaged_task声明中使用扩展初始值设定项

//! Doesn't compile in C++03, C++11 only.
//! error: extended initializer lists only available with -std=c++11 or -std=gnu++11 [-Werror]
{
    boost::packaged_task<std::string(std::string&)> example
    { boost::bind(&string_with_params, ans) };
    boost::future<std::string> f = example.get_future();
    boost::thread task(boost::move(example), ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
}
Run Code Online (Sandbox Code Playgroud)

例6:

使用shared_ptr进行线程化

以下用途 typedef boost::packaged_task<std::string(std::string&)> task_t;

由于无法复制打包的任务,因此绑定shared_ptr<T>::operator()此处task提供的建议解决方案.

// error: invalid use of incomplete type ‘class boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)>’
// error: incomplete type ‘task_t {aka boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)>}’ used in nested name specifier
// boost/thread/future.hpp:1320:11: error: declaration of ‘class boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)>’
{
    boost::shared_ptr<task_t> example = boost::make_shared<task_t>(boost::bind(&string_with_params, ans));
    boost::future<std::string> f = example->get_future();
    boost::thread task(boost::bind(&task_t::operator(), example));
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
}
Run Code Online (Sandbox Code Playgroud)

例7:

使用boost::asio::io_serviceboost::bind

//错误:无效使用不完整类型'类boost :: packaged_task(std :: basic_string&)>'//错误:不完整类型'task_t {aka boost :: packaged_task(std :: basic_string&)>}'用于嵌套名称说明符// boost/thread/future.hpp:1320:11:错误:'class boost :: packaged_task(std :: basic_string&)>'的声明

{
    boost::asio::io_service io_service;
    boost::thread_group threads;
    boost::asio::io_service::work work(io_service);

    for (int i = 0; i < 3; ++i)
    {
        threads.create_thread(boost::bind(&boost::asio::io_service::run,
            &io_service));
    }

    boost::shared_ptr<task_t> example = boost::make_shared<task_t>(boost::bind(&string_with_params, ans));
    boost::future<std::string> f = example->get_future();
    io_service.post(boost::bind(&task_t::operator(), example));
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    threads.join_all();
}
Run Code Online (Sandbox Code Playgroud)

我在这里做错了吗?我觉得我已经详尽地对此进行了测试,并没有取得任何进展.我已经尝试了绑定,线程和任务的所有其他组合来实现这一点,但它根本就没有发生.我感谢您提供的任何帮助.

最后说明:

我有一个使用期货和承诺的工作解决方案,并通过使用私有函数发布到我的线程,我返回一个有效的未来.这个问题似乎不一定是用户错误.

谢谢阅读.

Tan*_*ury 5

虽然我找不到文档中明确指出的限制,但更改历史记录指出向Boost.Thread提供参数类型的能力packaged_task是符合C++ 11的:

C++ 11合规性:添加ArgTypespackaged_task模板.BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK定义何时定义(Boost 1.55的默认值).

相关票证表明当可变参数模板不可用时,R()将仅提供签名.

由于C++ 03缺少可变参数模板,示例3-7将失败.此外,存在与示例6和7不匹配的类型.虽然将task_t其函数类型指定为std::string(std::string&),但是第一个且唯一的参数在期间被绑定到仿函数boost::bind().由于生成的函子不再需要参数,因此提供的函数类型packaged_task应该是std::string().

虽然packaged_task不支持C++ 03中的参数,但一个中间解决方案是创建一个包含较低级别的仿函数类型boost::promise.如果没有对可变参数模板和完美转发的支持,将会有很多operator()过载的样板代码.不过,这里有一个基本的示例函子忽略了promise和之间的异常处理future:

/// @brief basic_task to support function types with arguments.  This
///        provides a minimal feature workaround to Boost.Thread's
///        packaged_task not supporting argument types for C++03.
template <typename Fn>
class basic_task
{
public:
  // @brief The type the future will return.
  typedef typename boost::function_types::result_type<Fn>::type result_type;

  typedef boost::promise<result_type> promise_type;

  /// @brief Constructor.
  template <typename F> 
  explicit basic_task(const F& f)
    : fn_(f),
      promise_(boost::make_shared<promise_type>())
  {}

  // Overload operator() functions.

  void operator()()
  {
    promise_->set_value(fn_());
  }

  template <typename A1>
  void operator()(const A1& a1)
  {
    promise_->set_value(fn_(a1));
  }

  template <typename A1>
  void operator()(A1& a1)
  {
    promise_->set_value(fn_(a1));
  }

  /// @brief Get a future for this task' promise.
  boost::unique_future<result_type>
  get_future()
  {
    return promise_->get_future();
  }

private:
  boost::function<Fn> fn_;
  boost::shared_ptr<promise_type> promise_;
};
Run Code Online (Sandbox Code Playgroud)

完整系列的例子:

#include <iostream>
#include <string>

#define BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK

#include <boost/asio.hpp>
#include <boost/function_types/result_type.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>

/// @brief basic_task to support function types with arguments.  This
///        provides a minimal feature workaround to Boost.Thread's
///        packaged_task not supporting argument types for C++03.
template <typename Fn>
class basic_task
{
public:
  // @brief The type the future will return.
  typedef typename boost::function_types::result_type<Fn>::type result_type;

  typedef boost::promise<result_type> promise_type;

  /// @brief Constructor.
  template <typename F> 
  explicit basic_task(const F& f)
    : fn_(f),
      promise_(boost::make_shared<promise_type>())
  {}

  // Overload operator() functions.

  void operator()()
  {
    promise_->set_value(fn_());
  }

  template <typename A1>
  void operator()(const A1& a1)
  {
    promise_->set_value(fn_(a1));
  }

  template <typename A1>
  void operator()(A1& a1)
  {
    promise_->set_value(fn_(a1));
  }

  /// @brief Get a future for this task' promise.
  boost::unique_future<result_type>
  get_future()
  {
    return promise_->get_future();
  }

private:
  boost::function<Fn> fn_;
  boost::shared_ptr<promise_type> promise_;
};

std::string ans("forty two");

int int_no_params()
{
  return 42;
}

int int_with_params(int param)
{
  return param;
}

std::string string_no_params()
{
  return std::string("forty two");
}

std::string string_with_params(std::string & param)
{
  return param;
}

int main()
{
  // example 1
  {
    boost::packaged_task<int()> example(&int_no_params);
    boost::unique_future<int> f = example.get_future();
    boost::thread task(boost::move(example));
    int answer = f.get();
    std::cout << "Answer to life and whatnot, in English: "
              << answer << std::endl;
    task.join();
  }

  // example 2
  {
    boost::packaged_task<std::string()> example(&string_no_params);
    boost::unique_future<std::string> f = example.get_future();
    boost::thread task(boost::move(example));
    std::string answer = f.get();
    std::cout << "string_no_params: " << answer << std::endl;
    task.join();
  }

  // example 3
  {
    basic_task<std::string(std::string&)> example(&string_with_params);
    boost::unique_future<std::string> f = example.get_future();
    example(ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
  }

  // example 4
  {
    basic_task<std::string(std::string&)> example(&string_with_params);
    boost::unique_future<std::string> f = example.get_future();
    boost::thread task(boost::move(example), ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
  }

  // example 5
  {
    basic_task<std::string(std::string&)>
        example(boost::bind(&string_with_params, ans));
    boost::unique_future<std::string> f = example.get_future();
    boost::thread task(boost::move(example), ans);
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
  }

  // example 6
  {
    typedef boost::packaged_task<std::string()> task_t;
    boost::shared_ptr<task_t> example =
        boost::make_shared<task_t>(boost::bind(&string_with_params, ans));
    boost::unique_future<std::string> f = example->get_future();
    boost::thread task(boost::bind(&task_t::operator(), example));
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    task.join();
  }

  // example 7
  {
    boost::asio::io_service io_service;
    boost::thread_group threads;
    boost::asio::io_service::work work(io_service);

    for (int i = 0; i < 3; ++i)
      threads.create_thread(
          boost::bind(&boost::asio::io_service::run, &io_service));

    typedef boost::packaged_task<std::string()> task_t;
    boost::shared_ptr<task_t> example =
        boost::make_shared<task_t>(boost::bind(&string_with_params, ans));
    boost::unique_future<std::string> f = example->get_future();
    io_service.post(boost::bind(&task_t::operator(), example));
    std::string answer = f.get();
    std::cout << "string_with_params: " << answer << std::endl;
    io_service.stop();
    threads.join_all();
  }
}
Run Code Online (Sandbox Code Playgroud)

结果输出:

Answer to life and whatnot, in English: 42
string_no_params: forty two
string_with_params: forty two
string_with_params: forty two
string_with_params: forty two
string_with_params: forty two
string_with_params: forty two
Run Code Online (Sandbox Code Playgroud)