gez*_*eza 20 c++ linux multithreading pthreads task
我想用C++创建一个非常有效的任务调度程序系统.
基本的想法是这样的:
class Task {
public:
virtual void run() = 0;
};
class Scheduler {
public:
void add(Task &task, double delayToRun);
};
Run Code Online (Sandbox Code Playgroud)
在后面Scheduler,应该有一个固定大小的线程池,它运行任务(我不想为每个任务创建一个线程).delayToRun意味着task不会立即执行,但会在delayToRun几秒后执行(从添加到其中的点开始测量Scheduler).
(当然,这delayToRun意味着"至少"值.如果系统被加载,或者我们从调度程序中询问不可能,它将无法处理我们的请求.但它应该尽力而为)
这是我的问题.如何delayToRun有效地实现功能?我试图通过使用互斥锁和条件变量来解决这个问题.
我看到两种方式:
调度程序包含两个队列:allTasksQueue和tasksReadyToRunQueue.任务被加入allTasksQueue的Scheduler::add.有一个管理线程,其等待的时间量最小,因此它可以从把一个任务allTasksQueue来tasksReadyToRunQueue.工作线程等待可用的任务tasksReadyToRunQueue.
如果Scheduler::add在前面添加一个任务allTasksQueue(一个任务,其值为delayToRunso,它应该在当前最快运行任务之前),那么管理器任务需要被唤醒,因此它可以更新等待时间.
这种方法可以被认为是低效的,因为它需要两个队列,并且它需要两个condvar.signals来运行任务(一个用于allTasksQueue- > tasksReadyToRunQueue,一个用于表示工作线程实际运行任务)
调度程序中有一个队列.任务将添加到此队列中Scheduler::add.工作线程检查队列.如果它是空的,它会在没有时间限制的情况下等待.如果它不为空,则等待最快的任务.
如果只有一个条件变量为工作线程等待:这个方法可以被认为是低效的,因为如果在队列前面添加了一个任务(前面意味着,如果有N个工作线程,那么任务索引<N)然后需要唤醒所有工作线程以更新他们等待的时间.
如果每个线程都有一个单独的条件变量,那么我们可以控制唤醒哪个线程,所以在这种情况下我们不需要唤醒所有线程(我们只需要唤醒具有最大等待时间的线程) ,所以我们需要管理这个值).我目前正在考虑实现这一点,但确切的细节是很复杂的.有关此方法的任何建议/想法/文件吗?
有没有更好的解决方案来解决这个问题?我正在尝试使用标准C++功能,但我愿意使用平台依赖(我的主要平台是linux)工具(如pthreads),甚至是linux特定工具(如futexes),如果它们提供了更好的解决方案.
通过使用单个池线程等待"下次运行"任务的设计,您可以避免使用单独的"管理器"线程,并且必须在下一个运行任务更改时唤醒大量任务(如果有)一个条件变量,其余池线程无限期地等待第二个条件变量.
池线程将沿着以下行执行伪代码:
pthread_mutex_lock(&queue_lock);
while (running)
{
if (head task is ready to run)
{
dequeue head task;
if (task_thread == 1)
pthread_cond_signal(&task_cv);
else
pthread_cond_signal(&queue_cv);
pthread_mutex_unlock(&queue_lock);
run dequeued task;
pthread_mutex_lock(&queue_lock);
}
else if (!queue_empty && task_thread == 0)
{
task_thread = 1;
pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run);
task_thread = 0;
}
else
{
pthread_cond_wait(&queue_cv, &queue_lock);
}
}
pthread_mutex_unlock(&queue_lock);
Run Code Online (Sandbox Code Playgroud)
如果更改下一个要运行的任务,则执行:
if (task_thread == 1)
pthread_cond_signal(&task_cv);
else
pthread_cond_signal(&queue_cv);
Run Code Online (Sandbox Code Playgroud)
随着queue_lock举行.
在这种方案下,所有唤醒都只直接在一个线程上,只有一个任务优先级队列,并且不需要管理器线程.
你的规格太强了:
delayToRun意味着任务不会立即执行,但会在delayToRun几秒钟后执行
你忘了添加"至少":
delayToRun几秒钟后关键在于,如果使用0.1delayToRun 安排了一万个任务,那么它们肯定无法同时运行.
通过这样的更正,你只需维护一些队列(或议程)(预定开始时间,运行闭包),你保持该队列排序,并启动N(一些固定数量)线程,这些线程以原子方式弹出第一个元素议程并运行它.
然后需要唤醒所有工作线程以更新他们等待的时间.
不,一些工作线程会被唤醒.
阅读条件变量和广播.
您也可以使用POSIX定时器,请参阅timer_create(2)或Linux特定的fd定时器,请参阅timerfd_create(2)
你可能会避免运行在你的线程阻塞的系统调用,并有一些中央线程使用一些事件循环管理它们(见调查(2) ...); 否则,如果你有一百个任务正在运行sleep(100),一个任务计划在半秒内运行,它将不会在一百秒之前运行.
您可能想要了解继续传递样式编程(它与-CPS-高度相关).阅读由Juliusz Chroboczek撰写的关于Continuation Passing C的论文.
再看看Qt线程.
您也可以考虑使用Go(使用其Goroutines)进行编码.
这是您提供的接口的示例实现,最接近您的“使用管理器线程”描述。
它使用单个线程 ( timer_thread) 来管理队列 ( allTasksQueue),该队列根据任务必须启动的实际时间 ( std::chrono::time_point) 进行排序。
“队列”是一个std::priority_queue(保持其time_point关键元素排序)。
timer_thread通常会挂起,直到下一个任务开始或添加新任务时。
当一个任务即将运行时,它被放入tasksReadyToRunQueue,其中一个工作线程收到信号,被唤醒,将其从队列中删除并开始处理该任务。
请注意,线程池对线程数有编译时上限 (40)。如果您调度的任务多于可以分派给工作人员的任务,则新任务将阻塞,直到线程再次可用。
你说这种方法效率不高,但总的来说,对我来说似乎相当有效。这都是事件驱动的,您不会因不必要的旋转而浪费 CPU 周期。当然,这只是一个例子,还可以进行优化(注:std::multimap已替换为std::priority_queue)。
该实现符合 C++11 标准
#include <iostream>
#include <chrono>
#include <queue>
#include <unistd.h>
#include <vector>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <memory>
class Task {
public:
virtual void run() = 0;
virtual ~Task() { }
};
class Scheduler {
public:
Scheduler();
~Scheduler();
void add(Task &task, double delayToRun);
private:
using timepoint = std::chrono::time_point<std::chrono::steady_clock>;
struct key {
timepoint tp;
Task *taskp;
};
struct TScomp {
bool operator()(const key &a, const key &b) const
{
return a.tp > b.tp;
}
};
const int ThreadPoolSize = 40;
std::vector<std::thread> ThreadPool;
std::vector<Task *> tasksReadyToRunQueue;
std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue;
std::thread TimerThr;
std::mutex TimerMtx, WorkerMtx;
std::condition_variable TimerCV, WorkerCV;
bool WorkerIsRunning = true;
bool TimerIsRunning = true;
void worker_thread();
void timer_thread();
};
Scheduler::Scheduler()
{
for (int i = 0; i <ThreadPoolSize; ++i)
ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this));
TimerThr = std::thread(&Scheduler::timer_thread, this);
}
Scheduler::~Scheduler()
{
{
std::lock_guard<std::mutex> lck{TimerMtx};
TimerIsRunning = false;
TimerCV.notify_one();
}
TimerThr.join();
{
std::lock_guard<std::mutex> lck{WorkerMtx};
WorkerIsRunning = false;
WorkerCV.notify_all();
}
for (auto &t : ThreadPool)
t.join();
}
void Scheduler::add(Task &task, double delayToRun)
{
auto now = std::chrono::steady_clock::now();
long delay_ms = delayToRun * 1000;
std::chrono::milliseconds duration (delay_ms);
timepoint tp = now + duration;
if (now >= tp)
{
/*
* This is a short-cut
* When time is due, the task is directly dispatched to the workers
*/
std::lock_guard<std::mutex> lck{WorkerMtx};
tasksReadyToRunQueue.push_back(&task);
WorkerCV.notify_one();
} else
{
std::lock_guard<std::mutex> lck{TimerMtx};
allTasksQueue.push({tp, &task});
TimerCV.notify_one();
}
}
void Scheduler::worker_thread()
{
for (;;)
{
std::unique_lock<std::mutex> lck{WorkerMtx};
WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 ||
!WorkerIsRunning; } );
if (!WorkerIsRunning)
break;
Task *p = tasksReadyToRunQueue.back();
tasksReadyToRunQueue.pop_back();
lck.unlock();
p->run();
delete p; // delete Task
}
}
void Scheduler::timer_thread()
{
for (;;)
{
std::unique_lock<std::mutex> lck{TimerMtx};
if (!TimerIsRunning)
break;
auto duration = std::chrono::nanoseconds(1000000000);
if (allTasksQueue.size() != 0)
{
auto now = std::chrono::steady_clock::now();
auto head = allTasksQueue.top();
Task *p = head.taskp;
duration = head.tp - now;
if (now >= head.tp)
{
/*
* A Task is due, pass to worker threads
*/
std::unique_lock<std::mutex> ulck{WorkerMtx};
tasksReadyToRunQueue.push_back(p);
WorkerCV.notify_one();
ulck.unlock();
allTasksQueue.pop();
}
}
TimerCV.wait_for(lck, duration);
}
}
/*
* End sample implementation
*/
class DemoTask : public Task {
int n;
public:
DemoTask(int n=0) : n{n} { }
void run() override
{
std::cout << "Start task " << n << std::endl;;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << " Stop task " << n << std::endl;;
}
};
int main()
{
Scheduler sched;
Task *t0 = new DemoTask{0};
Task *t1 = new DemoTask{1};
Task *t2 = new DemoTask{2};
Task *t3 = new DemoTask{3};
Task *t4 = new DemoTask{4};
Task *t5 = new DemoTask{5};
sched.add(*t0, 7.313);
sched.add(*t1, 2.213);
sched.add(*t2, 0.713);
sched.add(*t3, 1.243);
sched.add(*t4, 0.913);
sched.add(*t5, 3.313);
std::this_thread::sleep_for(std::chrono::seconds(10));
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5695 次 |
| 最近记录: |