用于基于任务的并行性的通用c ++ 11函数包装器

rre*_*iki 5 c++ c++11

我正在实现一个工作窃取算法,并正在编写一个通用的函数包装器,它将promise作为包装器模板的可变参数之一.我想用这些函数包装器创建任务,并使每个节点使用promises与依赖节点进行通信.每个节点都维护一个依赖节点和承诺/期货的列表.每个节点都可以通过检查是否已设置所有期货来运行.promises可以根据函数包装器返回不同对象的作业而变化.如果单个算法可以分解为单独的操作,如读取消息和解码消息,则对对象执行检查,返回所有检查的结果,这些操作中的每一个都将返回不同的promise(对象,布尔值,结果).

这本书,C++ Concurrency in Action,有一个函数包装器实现,但是它没有处理这个用例.在网上的其他参考文献中,我看到过像std :: promise这样的承诺的硬编码引用,它只是一种类型.

有人可以建议我如何编写一个包装器来实现以下目的......

void add(int a, int b, std::promise<int>&& prms)
{
   int res = a + b;
   try {
      prms.set_value(res);
   }
   catch(...)
   {
      prms.set_exception(std::current_exception());
   }
}

int main()
{
   std::promise<int> prms;
   std::future<int> fut = prms.get_future();
   FunctionWrapper myFunctor(a, 10, 20, std::move(prms));

   // add the functor to the queue and it will be retrieved by a thread
   // that executes the task. since i have the future, i can pass it to the 
   // dependent worknode
}
Run Code Online (Sandbox Code Playgroud)

我尝试编写如下代码......但是在使用它时遇到了困难.

#ifndef FUNCTIONWRAPPER_HPP
#define FUNCTIONWRAPPER_HPP

template<typename F, typename R, typename... Args>
class FunctionWrapper
{
  class implbase
  {
  public:
    virtual ~implbase();
    virtual R execute(Args...)=0;
  };

  class impl : public implbase
  {
  public:
    impl(F&& f) : func(std::move(f)) {}
    virtual R execute(Args... args) { return func(args...); }

  private:
    F func;
  };

  std::shared_ptr<impl> internalFunc;

public:
  FunctionWrapper(F&& f) : internalFunc(0)
  {
    internalFunc = new impl<F, R, Args...>(f);
  }

  FunctionWrapper(const FunctionWrapper& other)
    : internalFunc(std::move(other.internalFunc))
  {}

  ~FunctionWrapper()
  {
    if(internalFunc)
      delete internalFunc;
  }

  R operator()(Args... args)
  {
    return internalFunc->execute(args...);
  }

  void swap(FunctionWrapper& other)
  {
    impl<R, Args...>* tmp = internalFunc;
    internalFunc = other.internalFunc;
    other.internalFunc = tmp;
  }

  FunctionWrapper& operator=(const FunctionWrapper& other)
  {
    FunctionWrapper(other).swap(*this);
    return *this;
  }

  FunctionWrapper& operator=(const F& f)
  {
    FunctionWrapper(f).swap(*this);
    return *this;
  }
};

#endif // FUNCTIONWRAPPER_HPP
Run Code Online (Sandbox Code Playgroud)

Sea*_*ine 8

C++ 11有一个包装器可以做到这一点!它被称为packaged_task.

它的作用是,它包装一个可调用的对象(函数对象,lambda,函数指针,绑定表达式等等),并通过get_future()匹配传入函数的返回类型的方法为您提供未来.

请考虑以下示例:

#include <thread>
#include <future>
#include <functional>
#include <iostream>

using namespace std;

int add(int a, int b)
{
    return a + b;
}

int main()
{
    // Create a std::packaged_task and grab the future out of it.
    packaged_task<int()> myTask(bind(add, 10, 20));
    future<int> myFuture = myTask.get_future();

    // Here, is where you would queue up the task in your example.
    // I'll launch it on another thread just to demonstrate how.
    thread myThread(std::move(myTask));
    myThread.detach();

    // myFuture.get() will block until the task completes.
    // ...or throw if the task throws an exception.
    cout << "The result is: " << myFuture.get() << endl;
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

正如您所看到的promise,我们指望packaged_task创造承诺并给予我们未来,而不是传递给我们.

此外,使用绑定表达式允许我们有效地将参数传递给任务,直到调用它为止.

使用packaged_task也将把未来的异常推到了手中packaged_task.这样,您的功能不需要调用set_exception().他们只需返回或扔.