Python 线程定时器如何在内部工作?

San*_*Kim 1 python multithreading timer scheduler

我想知道python是如何threading.Timer工作的。

更详细地说,当我运行几个时threading.Timer,它是否运行单独的线程来计算时间并运行处理程序?

或者一个线程一起管理和计数几个计时器?

I am asking because my application need to schedule many event, But

If threading.Timer runs separate each thread for counting a timer, and i run many timers, it may affect performance so much.

So i am worry that if i have to implement a scheduler running only one thread if it has big effect in performance.

fre*_*ish 5

threading.Timerclass 是 的子类,threading.Thread基本上它只是运行一个单独的线程,在该线程中休眠指定的时间并运行相应的函数。

这绝对不是安排事件的有效方法。更好的方法是在单个线程中进行调度,方法是使用Queue.PriorityQueue将事件放在“优先级”实际上意味着“下一个触发日期”的位置。类似于 cron 的工作方式。

或者甚至更好:使用已经存在的东西,不要重新发明轮子:Cron、Celery 等等……

通过Queue.PriorityQueue以下方式制作调度程序的非常简化的示例:

import time
from Queue import PriorityQueue

class Task(object):
    def __init__(self, fn, crontab):
        # TODO: it should be possible to pass args, kwargs
        # so that fn can be called with fn(*args, **kwargs)
        self.fn = fn
        self.crontab = crontab

    def get_next_fire_date(self):
        # TODO: evaluate next fire date based on self.crontab
        pass

class Scheduler(object):
    def __init__(self):
        self.event_queue = PriorityQueue()
        self.new_task = False

    def schedule_task(self, fn, crontab):
        # TODO: add scheduling language, crontab or something
        task = Task(fn, crontab)
        next_fire = task.get_next_fire_date()
        if next_fire:
            self.new_task = True
            self.event_queue.put((next_fire, task))

    def run(self):
        self.new_task = False

        # TODO: do we really want an infinite loop?
        while True:
            # TODO: actually we want .get() with timeout and to handle
            # the case when the queue is empty
            next_fire, task = self.event_queue.get()

            # incremental sleep so that we can check
            # if new tasks arrived in the meantime
            sleep_for = int(next_fire - time.time())
            for _ in xrange(sleep_for):
                time.sleep(1)
                if self.new_task:
                    self.new_task = False
                    self.event_queue.put((next_fire, task))
                    continue

            # TODO: run in separate thread?
            task.fn()

            time.sleep(1)
            next_fire = task.get_next_fire_date()

            if next_fire:
                event_queue.put((next_fire, task))

def test():
    return 'hello world'

sch = Scheduler()
sch.schedule_task(test, '5 * * * *')
sch.schedule_task(test, '0 22 * * 1-5')
sch.schedule_task(test, '1 1 * * *')
sch.run()
Run Code Online (Sandbox Code Playgroud)

这只是一个想法。您必须正确实现TaskScheduler类,即get_next_fire_date方法加上某种调度语言(crontab?)和错误处理。我仍然强烈建议使用现有的库之一。