如何在C++中创建高效的多线程任务调度程序?

gez*_*eza 20 c++ linux multithreading pthreads task

我想用C++创建一个非常有效的任务调度程序系统.

基本的想法是这样的:

class Task {
    public:
        virtual void run() = 0;
};

class Scheduler {
    public:
        void add(Task &task, double delayToRun);
};
Run Code Online (Sandbox Code Playgroud)

在后面Scheduler,应该有一个固定大小的线程池,它运行任务(我不想为每个任务创建一个线程).delayToRun意味着task不会立即执行,但会在delayToRun几秒后执行(从添加到其中的点开始测量Scheduler).

(当然,这delayToRun意味着"至少"值.如果系统被加载,或者我们从调度程序中询问不可能,它将无法处理我们的请求.但它应该尽力而为)

这是我的问题.如何delayToRun有效地实现功能?我试图通过使用互斥锁和条件变量来解决这个问题.

我看到两种方式:

用经理线程

调度程序包含两个队列:allTasksQueuetasksReadyToRunQueue.任务被加入allTasksQueueScheduler::add.有一个管理线程,其等待的时间量最小,因此它可以从把一个任务allTasksQueuetasksReadyToRunQueue.工作线程等待可用的任务tasksReadyToRunQueue.

如果Scheduler::add在前面添加一个任务allTasksQueue(一个任务,其值为delayToRunso,它应该在当前最快运行任务之前),那么管理器任务需要被唤醒,因此它可以更新等待时间.

这种方法可以被认为是低效的,因为它需要两个队列,并且它需要两个condvar.signals来运行任务(一个用于allTasksQueue- > tasksReadyToRunQueue,一个用于表示工作线程实际运行任务)

没有经理线程

调度程序中有一个队列.任务将添加到此队列中Scheduler::add.工作线程检查队列.如果它是空的,它会在没有时间限制的情况下等待.如果它不为空,则等待最快的任务.

  1. 如果只有一个条件变量为工作线程等待:这个方法可以被认为是低效的,因为如果在队列前面添加了一个任务(前面意味着,如果有N个工作线程,那么任务索引<N)然后需要唤醒所有工作线程以更新他们等待的时间.

  2. 如果每个线程都有一个单独的条件变量,那么我们可以控制唤醒哪个线程,所以在这种情况下我们不需要唤醒所有线程(我们只需要唤醒具有最大等待时间的线程) ,所以我们需要管理这个值).我目前正在考虑实现这一点,但确切的细节是很复杂的.有关此方法的任何建议/想法/文件吗?


有没有更好的解决方案来解决这个问题?我正在尝试使用标准C++功能,但我愿意使用平台依赖(我的主要平台是linux)工具(如pthreads),甚至是linux特定工具(如futexes),如果它们提供了更好的解决方案.

caf*_*caf 8

通过使用单个池线程等待"下次运行"任务的设计,您可以避免使用单独的"管理器"线程,并且必须在下一个运行任务更改时唤醒大量任务(如果有)一个条件变量,其余池线程无限期地等待第二个条件变量.

池线程将沿着以下行执行伪代码:

pthread_mutex_lock(&queue_lock);

while (running)
{
    if (head task is ready to run)
    {
        dequeue head task;
        if (task_thread == 1)
            pthread_cond_signal(&task_cv);
        else
            pthread_cond_signal(&queue_cv);

        pthread_mutex_unlock(&queue_lock);
        run dequeued task;
        pthread_mutex_lock(&queue_lock);
    }
    else if (!queue_empty && task_thread == 0)
    {
        task_thread = 1;
        pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run);
        task_thread = 0;
    }
    else
    {
        pthread_cond_wait(&queue_cv, &queue_lock);
    }
}

pthread_mutex_unlock(&queue_lock);
Run Code Online (Sandbox Code Playgroud)

如果更改下一个要运行的任务,则执行:

if (task_thread == 1)
    pthread_cond_signal(&task_cv);
else
    pthread_cond_signal(&queue_cv);
Run Code Online (Sandbox Code Playgroud)

随着queue_lock举行.

在这种方案下,所有唤醒都只直接在一个线程上,只有一个任务优先级队列,并且不需要管理器线程.


Bas*_*tch 6

你的规格太强了:

delayToRun意味着任务不会立即执行,但会在delayToRun几秒钟后执行

你忘了添加"至少":

  • 该任务现在不会被执行,但至少 delayToRun几秒钟后

关键在于,如果使用0.1delayToRun 安排了一万个任务,那么它们肯定无法同时运行.

通过这样的更正,你只需维护一些队列(或议程)(预定开始时间,运行闭包),你保持该队列排序,并启动N(一些固定数量)线程,这些线程以原子方式弹出第一个元素议程并运行它.

然后需要唤醒所有工作线程以更新他们等待的时间.

不,一些工作线程会被唤醒.

阅读条件变量和广播.

您也可以使用POSIX定时器,请参阅timer_create(2)或Linux特定的fd定时器,请参阅timerfd_create(2)

你可能会避免运行在你的线程阻塞的系统调用,并有一些中央线程使用一些事件循环管理它们(见调查(2) ...); 否则,如果你有一百个任务正在运行sleep(100),一个任务计划在半秒内运行,它将不会在一百秒之前运行.

您可能想要了解继续传递样式编程(它与-CPS-高度相关).阅读由Juliusz Chroboczek撰写的关于Continuation Passing C的论文.

再看看Qt线程.

您也可以考虑使用Go(使用其Goroutines)进行编码.


LWi*_*sey 5

这是您提供的接口的示例实现,最接近您的“使用管理器线程”描述。

它使用单个线程 ( timer_thread) 来管理队列 ( allTasksQueue),该队列根据任务必须启动的实际时间 ( std::chrono::time_point) 进行排序。
“队列”是一个std::priority_queue(保持其time_point关键元素排序)。

timer_thread通常会挂起,直到下一个任务开始或添加新任务时。
当一个任务即将运行时,它被放入tasksReadyToRunQueue,其中一个工作线程收到信号,被唤醒,将其从队列中删除并开始处理该任务。

请注意,线程池对线程数有编译时上限 (40)。如果您调度的任务多于可以分派给工作人员的任务,则新任务将阻塞,直到线程再次可用。

你说这种方法效率不高,但总的来说,对我来说似乎相当有效。这都是事件驱动的,您不会因不必要的旋转而浪费 CPU 周期。当然,这只是一个例子,还可以进行优化(注:std::multimap已替换为std::priority_queue)。

该实现符合 C++11 标准

#include <iostream>
#include <chrono>
#include <queue>
#include <unistd.h>
#include <vector>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <memory>

class Task {
public:
    virtual void run() = 0;
    virtual ~Task() { }
};

class Scheduler {
public:
    Scheduler();
    ~Scheduler();

    void add(Task &task, double delayToRun);

private:
    using timepoint = std::chrono::time_point<std::chrono::steady_clock>;

    struct key {
        timepoint tp;
        Task *taskp;
    };

    struct TScomp {
        bool operator()(const key &a, const key &b) const
        {
            return a.tp > b.tp;
        }
    };

    const int ThreadPoolSize = 40;

    std::vector<std::thread> ThreadPool;
    std::vector<Task *> tasksReadyToRunQueue;

    std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue;

    std::thread TimerThr;
    std::mutex TimerMtx, WorkerMtx;
    std::condition_variable TimerCV, WorkerCV;

    bool WorkerIsRunning = true;
    bool TimerIsRunning = true;

    void worker_thread();
    void timer_thread();
};

Scheduler::Scheduler()
{
    for (int i = 0; i <ThreadPoolSize; ++i)
        ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this));

    TimerThr = std::thread(&Scheduler::timer_thread, this);
}

Scheduler::~Scheduler()
{
    {
        std::lock_guard<std::mutex> lck{TimerMtx};
        TimerIsRunning = false;
        TimerCV.notify_one();
    }
    TimerThr.join();

    {
        std::lock_guard<std::mutex> lck{WorkerMtx};
        WorkerIsRunning = false;
        WorkerCV.notify_all();
    }
    for (auto &t : ThreadPool)
        t.join();
}

void Scheduler::add(Task &task, double delayToRun)
{
    auto now = std::chrono::steady_clock::now();
    long delay_ms = delayToRun * 1000;

    std::chrono::milliseconds duration (delay_ms);

    timepoint tp = now + duration;

    if (now >= tp)
    {
        /*
         * This is a short-cut
         * When time is due, the task is directly dispatched to the workers
         */
        std::lock_guard<std::mutex> lck{WorkerMtx};
        tasksReadyToRunQueue.push_back(&task);
        WorkerCV.notify_one();

    } else
    {
        std::lock_guard<std::mutex> lck{TimerMtx};

        allTasksQueue.push({tp, &task});

        TimerCV.notify_one();
    }
}

void Scheduler::worker_thread()
{
    for (;;)
    {
        std::unique_lock<std::mutex> lck{WorkerMtx};

        WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 ||
                                           !WorkerIsRunning; } );

        if (!WorkerIsRunning)
            break;

        Task *p = tasksReadyToRunQueue.back();
        tasksReadyToRunQueue.pop_back();

        lck.unlock();

        p->run();

        delete p; // delete Task
    }
}

void Scheduler::timer_thread()
{
    for (;;)
    {
        std::unique_lock<std::mutex> lck{TimerMtx};

        if (!TimerIsRunning)
            break;

        auto duration = std::chrono::nanoseconds(1000000000);

        if (allTasksQueue.size() != 0)
        {
            auto now = std::chrono::steady_clock::now();

            auto head = allTasksQueue.top();
            Task *p = head.taskp;

            duration = head.tp - now;
            if (now >= head.tp)
            {
                /*
                 * A Task is due, pass to worker threads
                 */
                std::unique_lock<std::mutex> ulck{WorkerMtx};
                tasksReadyToRunQueue.push_back(p);
                WorkerCV.notify_one();
                ulck.unlock();

                allTasksQueue.pop();
            }
        }

        TimerCV.wait_for(lck, duration);
    }
}
/*
 * End sample implementation
 */



class DemoTask : public Task {
    int n;
public:
    DemoTask(int n=0) : n{n} { }
    void run() override
    {
        std::cout << "Start task " << n << std::endl;;
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << " Stop task " << n << std::endl;;
    }
};

int main()
{
    Scheduler sched;

    Task *t0 = new DemoTask{0};
    Task *t1 = new DemoTask{1};
    Task *t2 = new DemoTask{2};
    Task *t3 = new DemoTask{3};
    Task *t4 = new DemoTask{4};
    Task *t5 = new DemoTask{5};

    sched.add(*t0, 7.313);
    sched.add(*t1, 2.213);
    sched.add(*t2, 0.713);
    sched.add(*t3, 1.243);
    sched.add(*t4, 0.913);
    sched.add(*t5, 3.313);

    std::this_thread::sleep_for(std::chrono::seconds(10));
}
Run Code Online (Sandbox Code Playgroud)