C++延迟任务

Sta*_*ked 7 c++ multithreading

我想实现一种方法来安排稍后执行的任务.界面类似于JavaScript setTimeout(function, milliseconds).

在我的应用程序中,某些资源由一个线程拥有.为了避免竞争条件,必须始终从同一个线程访问它们.如果其他线程想要访问资源,则必须将任务对象分派给资源线程.

所以我需要解决的两个问题是:

  1. 将任务分派给一个线程
  2. 延迟调用

通过使用无消耗队列快速修复第一个问题,该队列在消费端具有资源线程.(我使用TBB的concurrent_bounded_queue.)然而,第二个问题对我来说并不那么明显.我可以想到两个策略:

  1. 为每个任务启动一个新线程.该线程将睡眠所需的延迟,然后将任务分派到并发队列.
  2. 仅启动一个运行循环的线程,该循环迭代计划的任务,并在等待时间到期时调用它们.

我已经尝试了这两种方法,我倾向于支持第一种,因为它简单可靠,而第二种往往更容易出现微妙的错误.第一种方法将此委托给OS线程调度程序.

但是,第一个解决方案确实会创建大量短期线程,而我通常会听到重用线程的建议.

bob*_*bah 3

手动实施将如下所示。

struct myrunnable {
  uint64_t id_;
  uint64_t stamp_;
  std::function<void()> runnable_;
  uint64_t id() { return id_; }
  uint64_t stamp() { return stamp_; }
  void execute() { if (runnable_) runnable_(); }
};

typedef std::shared_ptr<myrunnable> task_t;
// timestamp_cmp_t - a comparator by timestamp + incrementing task id
typedef tbb::concurrent_blocking_queue<task_t> queue_t;
typedef std::priority_queue<task, timestamp_cmp_t> schedule_t;

uint64_t now(); // a wrapper around gettimeofday(), write yourself

queue_t queue; // inbound concurrent blocking queue not bound in size
schedule_t schedule; // priority queue, a scheduler
// queue_t sink; // optional sink concurrent queue if you don't
                 // want to execute tasks in the scheduler thread context

// now() - a wrapper around gettimeofday(), write yourself
for(;;) { // "termination mark" comments below - exit points
  while (!schedule.empty() && schedule.top().stamp() <= now()) {
    task_t task = schedule.pop();
    task .execute();
    // alternatively sink.push(task) to offload scheduler thread
  }

  if (schedule.empty()) {
    task_t task = queue.pop(); // block on the input queue
    if (!task) return; // scheduler termination mark, empty task
    schedule.push(task);
  } else {
    // Here we are driven by our latency/cpu balance requirements
    // in this example we are ultra low latency and are just spinning CPU
    // and on Linux such thread may need extra tuning to perform consistently.
    // To pace it down one can use TBB's sleep_for() or select() system call

    while (schedule.top().stamp() > now()) {
      task_t task;
      if (queue.try_pop(task)) {
        if (!task) return; // scheduler termination mark, empty task
        schedule.push(task);
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

  • @SigTerm - 紧密旋转循环上方代码中的注释清楚地表明,等待数据到来的方法有很多,包括旋转、基于 nop 的旋转(通过 tbb::sleep_for)或通过系统调用进行深度睡眠顺便说一句,就像 select() 一样,它的睡眠时间不能少于 10 毫秒。你应该停止以黑白方式看待世界:) (2认同)