在Python 3中安排重复事件

Hum*_*art 29 python timing scheduled-tasks python-3.x

我正在尝试安排重复事件在Python 3中每分钟运行一次.

我见过上课,sched.scheduler但我想知道是否还有其他方法可以做到.我听说提到我可以使用多个线程,我不介意这样做.

我基本上要求一些JSON,然后解析它; 它的价值随着时间而变化.

要使用sched.scheduler我必须创建一个循环来请求它安排甚至运行一小时:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()
Run Code Online (Sandbox Code Playgroud)

有什么其他方法可以做到这一点?

Ale*_*lli 41

您可以使用threading.Timer,但它也会调度一次性事件,类似于.enter调度程序对象的方法.

将一次性调度程序转换为周期性调度程序的正常模式(以任何语言)是使每个事件以指定的间隔重新调度自身.例如,有了sched,我不会像你一样使用循环,而是像:

def periodic(scheduler, interval, action, actionargs=()):
    scheduler.enter(interval, 1, periodic,
                    (scheduler, interval, action, actionargs))
    action(*actionargs)
Run Code Online (Sandbox Code Playgroud)

并通过电话启动整个"永久定期时间表"

periodic(scheduler, 3600, query_rate_limit)
Run Code Online (Sandbox Code Playgroud)

或者,我可以使用threading.Timer而不是scheduler.enter,但模式非常相似.

如果您需要更精细的变化(例如,在给定时间或某些条件下停止定期重新安排),那么就不太容易使用一些额外的参数.

  • 好吧,在java中我有timer.scheduleAtFixedRate()和真正的多线程.每个人都说在python中我们写的代码较少......嗯哼......只是说...... (8认同)
  • 遗憾的是,@ user1685095对任何通用语句都有*always*例外. (3认同)
  • @Wallacoloo是否意味着*不是*总是例外?:) (3认同)
  • @ user1685095不是那么快!尝试编写没有多次导入的代码,扩展TimerTask,提供`run`方法,并添加一个单独的类来运行计时器以及创建所有这些对象.可能大约15行代码.(除非你有更清晰的方法去做;我不是Java最棒的.) (2认同)

Mes*_*ion 18

我对这个问题的谦虚态度:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.function   = function
        self.interval   = interval
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False
Run Code Online (Sandbox Code Playgroud)

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
Run Code Online (Sandbox Code Playgroud)

特征:

  • 仅限标准库,无外部依赖项
  • 使用Alex Martnelli建议的模式
  • start()stop()即使定时器已经启动/停止,也可以安全地多次呼叫
  • 要调用的函数可以有位置和命名参数
  • 您可以interval随时更改,它将在下次运行后生效.同样的args,kwargs甚至是function!


dba*_*der 16

你可以使用时间表.它适用于Python 2.7和3.3,并且非常轻量级:

import schedule
import time

def job():
   print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
   schedule.run_pending()
   time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

  • 为什么while循环?,它不会像cron作业一样运行吗? (3认同)
  • 使用 GUI 时会挂起 (3认同)

jor*_*xou 6

您可以使用Advanced Python Scheduler.它甚至有一个类似cron的界面.


fdb*_*fdb 6

基于MestreLion答案,它解决了多线程的一个小问题:

from threading import Timer, Lock


class Periodic(object):
    """
    A periodic task running in threading.Timers
    """

    def __init__(self, interval, function, *args, **kwargs):
        self._lock = Lock()
        self._timer = None
        self.function = function
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self._stopped = True
        if kwargs.pop('autostart', True):
            self.start()

    def start(self, from_run=False):
        self._lock.acquire()
        if from_run or self._stopped:
            self._stopped = False
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self._lock.release()

    def _run(self):
        self.start(from_run=True)
        self.function(*self.args, **self.kwargs)

    def stop(self):
        self._lock.acquire()
        self._stopped = True
        self._timer.cancel()
        self._lock.release()
Run Code Online (Sandbox Code Playgroud)


use*_*ser 6

使用芹菜.

from celery.task import PeriodicTask
from datetime import timedelta


class ProcessClicksTask(PeriodicTask):
    run_every = timedelta(minutes=30)

    def run(self, **kwargs):
        #do something
Run Code Online (Sandbox Code Playgroud)


nor*_*ree 6

根据 Alex Martelli 的回答,我实现了更容易集成的装饰器版本。

import sched
import time
import datetime
from functools import wraps
from threading import Thread


def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl
    return async_func


def schedule(interval):
    def decorator(func):
        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)

        @wraps(func)
        def wrap(*args, **kwargs):
            scheduler = sched.scheduler(time.time, time.sleep)
            periodic(scheduler, interval, func)
            scheduler.run()
        return wrap
    return decorator


@async
@schedule(1)
def periodic_event():
    print(datetime.datetime.now())


if __name__ == '__main__':
    print('start')
    periodic_event()
    print('end')
Run Code Online (Sandbox Code Playgroud)