将执行从一个线程移动到另一个线程以实现任务并行和逐个调用

pau*_*ens 5 c++ parallel-processing concurrency multithreading c++11

我正在尝试在C++中实现一个call-by-future机制.虽然这只是一个测试代码(有点匆忙),但我打算使用类似于我正在研究的语言运行时的透明并行性.

我已经干了我正在努力的代码,使它变得更小,尽管它仍然很大:

#include <cstdlib>
#include <cstdio>
#include <iostream>
#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <functional>
#include <type_traits>
#include <utility>
using namespace std;
using namespace std::chrono;

//------------------------------------------------------------------------------
// Simple locked printer

static std::recursive_mutex print_lock;

inline void print_() {
  return;
};

template<typename T, typename... Args>
inline void print_(T t, Args... args) {
  print_lock.lock();
  std::cout << t;
  print_(args...);
  print_lock.unlock();
};
//------------------------------------------------------------------------------

template<typename R>
class PooledTask {
  public:
    explicit PooledTask(function<R()>);

    // Possibly execute the task and return the value
    R &operator () () {

      // If we can get the lock, we're not executing
      if(lock.try_lock()) {

        // We may already have executed it
        if(done)
          goto end;

        // Otherwise, execute it now
        try {
          result = move(task());
        } catch(...) {
          // If an exception is thrown, save it for later
          eptr = current_exception();
          failed = true;
        };

        done = true;

        goto end;

      } else {

        // Wait until the task is completed
        lock.lock();

        end: {
          lock.unlock();

          // Maybe we got an exception!
          if(failed)
            rethrow_exception(eptr);

          // Otherwise, just return the result
          return result;
        };
      };
    };

  private:
    exception_ptr eptr;
    function<R()> task;
    bool done;
    bool failed;
    mutex lock;
    R result;
};

extern class TaskPool pool;

class TaskPool {
  public:
    TaskPool() noexcept: TaskPool(thread::hardware_concurrency() - 1) {
      return;
    };

    TaskPool(const TaskPool &) = delete;
    TaskPool(TaskPool &&) = delete;

    template<typename T>
    void push(PooledTask<T> *task) noexcept {

      lock_guard<mutex> guard(lock);

      builders.push([=] {
        try {
          (*task)();
        } catch(...) {
          // Ignore it here! The task will save it. :)
        };
      });

    };

    ~TaskPool() {
      // TODO: wait for all tasks to finish...
    };
  private:
    queue<thread *> threads;
    queue<function<void()>> builders;
    mutex lock;

    TaskPool(signed N) noexcept {
      while(N --> 0)
        threads.push(new thread([this, N] {
          for(;;) {

            pop_task();

          };
        }));
    };

    void pop_task() noexcept {

      lock.lock();

      if(builders.size()) {

        auto task = builders.front();

        builders.pop();

        lock.unlock();

        task();

      } else
        lock.unlock();
    };

} pool;


template<typename R>
PooledTask<R>::PooledTask(function<R()> fun):
  task(fun),
  done(false),
  failed(false)
{
  pool.push(this);
};

// Should probably return a std::shared_ptr here...
template<typename F, typename... Args>
auto byfuture(F fun, Args&&... args) noexcept ->
  PooledTask<decltype(fun(args...))> *
{

  using R = decltype(fun(args...));

  auto pooled = new PooledTask<R> {
    bind(fun, forward<Args>(args)...)
  };

  return pooled;
};


//------------------------------------------------------------------------------
#include <map>

// Get the current thread id as a simple number
static int myid() noexcept {
  static unsigned N = 0;
  static map<thread::id, unsigned> hash;
  static mutex lock;

  lock_guard<mutex> guard(lock);

  auto current = this_thread::get_id();

  if(!hash[current])
    hash[current] = ++N;

  return hash[current];
};
//------------------------------------------------------------------------------

//------------------------------------------------------------------------------
// The fibonacci test implementation
int future_fib(int x, int parent) {

  if(x < 3)
    return 1;

  print_("future_fib(", x, ")", " on thread ", myid(), \
         ", asked by thread ", parent, "\n");

  auto f1 = byfuture(future_fib, x - 1, myid());
  auto f2 = byfuture(future_fib, x - 2, myid());

  auto res = (*f1)() + (*f2)();

  delete f1;
  delete f2;

  return res;
};
//------------------------------------------------------------------------------

int main() {
  // Force main thread to get id 1
  myid();

  // Get task
  auto f = byfuture(future_fib, 8, myid());

  // Make sure it starts on the task pool
  this_thread::sleep_for(seconds(1));

  // Blocks
  (*f)();

  // Simply wait to be sure all threads are clean
  this_thread::sleep_for(seconds(2));

  //
  return EXIT_SUCCESS;
};
Run Code Online (Sandbox Code Playgroud)

这个程序的结果是这样的(我有一个四核,所以池中有3个线程):

future_fib(8) on thread 2, asked by thread 1
future_fib(7) on thread 3, asked by thread 2
future_fib(6) on thread 4, asked by thread 2
future_fib(6) on thread 3, asked by thread 3
future_fib(5) on thread 4, asked by thread 4
future_fib(5) on thread 3, asked by thread 3
future_fib(4) on thread 4, asked by thread 4
future_fib(4) on thread 3, asked by thread 3
future_fib(3) on thread 4, asked by thread 4
future_fib(3) on thread 3, asked by thread 3
future_fib(3) on thread 4, asked by thread 4
future_fib(3) on thread 3, asked by thread 3
future_fib(4) on thread 4, asked by thread 4
future_fib(4) on thread 3, asked by thread 3
future_fib(3) on thread 4, asked by thread 4
future_fib(3) on thread 3, asked by thread 3
future_fib(5) on thread 3, asked by thread 3
future_fib(4) on thread 3, asked by thread 3
future_fib(3) on thread 3, asked by thread 3
future_fib(3) on thread 3, asked by thread 3
Run Code Online (Sandbox Code Playgroud)

与普通的斐波那契函数相比,这种实现方式非常慢.

所以这里的问题是:当池运行时fib(8),它将创建两个将在下一个线程上运行的任务,但是,当它到达时auto res = (*f1)() + (*f2)();,两个任务都已经运行,因此它将阻塞f1(在线程3 上运行).

为了提高速度,我需要做的是为线程2而不是阻塞f1,假设执行任何线程3正在做的事情,让它准备好接受另一个任务,因此没有线程会在休眠时进行计算.

这篇文章http://bartoszmilewski.com/2011/10/10/async-tasks-in-c11-not-quite-there-yet/表示有必要做我想做的事,但没有具体说明.

我怀疑的是:我怎么可能这样做?

还有其他选择做我想要的吗?

Max*_*kin 1

我认为您可能有机会使用当前为 C++ 标准化提出的可恢复函数。该提案尚未获得批准,但 Visual Studio 15 CTP 实现了该提案,因此您可以尝试制作原型(如果可以使用 MSVC 编译器)。

Gor Nishanov(最新提案论文的作者之一)在他的 CppCon 演讲中描述了一个非常相似的示例,即从 23:47 开始使用“父窃取调度”计算斐波那契:https://www.youtube.com/watch? v=KUhSjfSbINE

但请注意,我找不到 的实现的任何来源/示例spawnable<T>,因此您可能需要联系提案作者以获取详细信息。