在Python中每x秒重复执行一次函数的最佳方法是什么?

Dav*_*idM 245 python timer

我想永远每60秒在Python中重复执行一个函数(就像目标C中的NSTimer一样).这段代码将作为守护进程运行,实际上就像使用cron每分钟调用python脚本一样,但不需要用户设置.

这个关于用Python实现的cron的问题中,解决方案似乎实际上只是sleep() x秒.我不需要这样的高级功能,所以也许这样的东西可行

while True:
    # Code executed here
    time.sleep(60)
Run Code Online (Sandbox Code Playgroud)

这段代码有可预见的问题吗?

nos*_*klo 208

使用sched模块,它实现了一个通用的事件调度程序.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print "Doing stuff..."
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()
Run Code Online (Sandbox Code Playgroud)

  • sched模块用于调度函数在一段时间后运行,如何使用它来每隔x秒重复一次函数调用而不使用time.sleep()? (12认同)
  • @JavaSa:因为*"做你的东西"*不是瞬间的,来自`time.sleep`的错误可能在这里积累."每隔X秒执行一次"和"反复执行延迟~X秒"是不一样的.另请参阅[此评论](http://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds-in-python/474543 ?noredirect = 1#comment64793337_25251804) (7认同)
  • 注意:此版本可能会漂移.您可以使用`enterabs()`来避免它.这是[用于比较的非漂移版本](http://stackoverflow.com/a/25251804/4279). (5认同)
  • 您可以将 s.enter(...) 移动到函数的开头以减少漂移。另外,“sc”有什么意义? (4认同)
  • @Baishampayan:安排新的跑步. (2认同)
  • 然后在http://packages.python.org/APScheduler/上的apscheduler也应该在这一点上提到. (2认同)
  • 谢谢,接受的答案应该改变,并非所有人都阅读评论...... (2认同)

Dav*_*ove 154

只需将时间循环锁定到系统时钟即可.简单.

import time
starttime=time.time()
while True:
  print "tick"
  time.sleep(60.0 - ((time.time() - starttime) % 60.0))
Run Code Online (Sandbox Code Playgroud)

  • +1.你的和`twisted`答案是每隔一个'x`秒运行一个函数的唯一答案.其余的执行函数,每次调用后延迟"x"秒. (18认同)
  • 如果你在哪里添加一些代码,这需要超过一秒......它会抛出时间并开始落后......在这种情况下接受的答案是正确的...任何人都可以循环一个简单的打印命令让它每秒都运行一次...... (11认同)
  • 工作非常好.如果你开始将它同步到某个时间,就没有必要减去你的`starttime`:`time.sleep(60 - time.time()%60)`对我来说一直很好.我用它作为`time.sleep(1200 - time.time()%1200)`它给我记录了`:00:20:40`,正如我想要的那样. (10认同)
  • 我更喜欢"从时间导入时间,睡眠",因为存在的影响;) (4认同)
  • @AntonSchigur避免多次迭代后的漂移.单个迭代可能迟早或稍后开始,具体取决于`sleep()`,`timer()`精度以及执行循环体所需的时间,但平均迭代总是发生在区间边界上(即使有些被跳过) :[`while keep_doing_it():sleep(interval - timer()%interval)`](http://stackoverflow.com/a/26609843).与比较,它只是'而keep_doing_it():睡眠(间隔)`这里经过几次反复的错误可能会积累. (2认同)
  • @TrainHeartnet 当忽略模数时,`time.time() - starttime` 的结果将大于设置的时间(在这种情况下为 60),因此 `60 - (time.time() - starttime) 的结果` 将是负数,这会导致睡眠功能冻结(不完全是,但它只是等待大量时间)。在这种情况下,%60 只是防止它变得大于 60。 (2认同)
  • `time.monotonic()` 在这里更合适,以避免夏令时和其他相关问题。 (2认同)

Aar*_*paa 67

您可能想要考虑Twisted,它是一个实现Reactor Pattern的Python网络库.

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()
Run Code Online (Sandbox Code Playgroud)

虽然"while True:sleep(60)"可能会工作Twisted可能已经实现了你最终需要的许多功能(如bobince所指出的守护进程,日志记录或异常处理),并且可能是一个更强大的解决方案

  • 扭曲非常酷,但对于所描述的特定问题来说似乎有点矫枉过正。 (5认同)
  • 我知道Twisted可以做到这一点.感谢分享示例代码! (2认同)
  • 这个漂移在毫秒级 (2认同)
  • “毫秒级漂移”是什么意思? (2认同)

Mes*_*ion 56

如果你想要一种非阻塞方式来定期执行你的函数,而不是阻塞无限循环,我会使用一个线程计时器.这样,您的代码可以继续运行并执行其他任务,并且每隔n秒仍然会调用您的函数.我在很长的CPU /磁盘/网络密集型任务中使用这种技术来打印进度信息.

这是我在类似问题中发布的代码,包含start()和stop()控件:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False
Run Code Online (Sandbox Code Playgroud)

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
Run Code Online (Sandbox Code Playgroud)

特征:

  • 仅限标准库,无外部依赖项
  • start()stop()即使定时器已经启动/停止,也可以安全地多次呼叫
  • 要调用的函数可以有位置和命名参数
  • 您可以interval随时更改,它将在下次运行后生效.同样的args,kwargs甚至是function!

  • 我想我已经深究了。@MestreLion 的解决方案每 x 秒运行一个函数(即 t=0, t=1x, t=2x, t=3x, ...),其中在原始海报示例代码中以 *x* 秒间隔运行一个函数之间。此外,如果`interval` 比执行`function` 所需的时间短,我认为这个解决方案有一个错误。在这种情况下,`self._timer` 将在 `start` 函数中被覆盖。 (2认同)

Itx*_*aka 31

我认为更简单的方法是:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()
Run Code Online (Sandbox Code Playgroud)

这样你的代码就会被执行,然后等待60秒然后它再次执行,等待,执行等......不需要复杂化的东西:D

  • 实际上这不是答案:time sleep()只能在每次执行后等待X秒.例如,如果您的函数需要0.5秒执行并且您使用time.sleep(1),则表示您的函数每1.5秒执行一次,而不是1.您应该使用其他模块和/或线程来确保某些内容适用于Y次在每X秒. (37认同)
  • 在我看来,代码应该在 `while True` 循环中调用 `time.sleep()`,如: `def executeSomething(): print('10 sec left') ; 而真:executeSomething(); 时间.睡眠(10)` (2认同)

era*_*oul 18

以下是MestreLion代码的更新,可以避免随着时间的推移而进行漫游:

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False
Run Code Online (Sandbox Code Playgroud)

  • 我同意这是最好的 - 没有第 3 方软件包,并且我已经测试过它不会随着时间的推移而漂移 (2认同)
  • 请注意,这会遇到同样的问题,因为它将为每个调用创建线程,而没有任何方法来捕获这些线程内发生的错误。 (2认同)

Alf*_*lfe 18

import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)
Run Code Online (Sandbox Code Playgroud)

如果你想在不阻塞剩余代码的情况下执行此操作,可以使用它来让它在自己的线程中运行:

import threading
threading.Thread(target=lambda: every(5, foo)).start()
Run Code Online (Sandbox Code Playgroud)

该解决方案结合了其他解决方案中很少结合的几种功能:

  • 异常处理:在此级别上尽可能正确处理异常,即在不中止程序的情况下记录以进行调试.
  • 没有链接:你在许多答案中找到的常见的链式实现(用于调度下一个事件)在调度机制(threading.Timer或其他)中出现任何问题时会很脆弱,这将终止链.即使问题的原因已经解决,也不会再发生进一步的执行.一个简单的循环和等待简单sleep()相比,更加强大.
  • 没有漂移:我的解决方案可以准确跟踪它应该运行的时间.根据执行时间没有漂移(如许多其他解决方案中那样).
  • 跳过:如果一次执行占用太多时间,我的解决方案将跳过任务(例如,每五秒执行一次X,但X需要6秒).这是标准的cron行为(并且有充分的理由).然后,许多其他解决方案只是连续几次执行任务而没有任何延迟.对于大多数情况(例如清理任务),这是不希望的.如果希望,简单地使用next_time += delay来代替.

  • 不漂流的最佳答案。 (2认同)
  • 在 Python 中,多亏了 GIL,在两个线程中访问变量是非常安全的。并且仅仅在两个线程中读取应该永远不会成为问题(在其他线程环境中也不应该)。只有在没有 GIL 的系统中(例如在 Java、C++ 等中)从两个不同的线程写入需要一些显式同步。 (2认同)

Lea*_*ple 13

我最终使用了schedule模块。API很好。

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

  • 使用 `gevent.spawn()` 让它不会阻塞你的主线程。我调用了一个方法来处理我的所有调度程序初始化,并且它工作得非常好。 (2认同)

bob*_*nce 6

它与 cron 之间的主要区别在于,异常将永远杀死守护进程。您可能希望使用异常捕获器和记录器进行包装。


RCv*_*ram 6

另一种灵活性解决方案是Apscheduler

pip install apscheduler
Run Code Online (Sandbox Code Playgroud)
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()
Run Code Online (Sandbox Code Playgroud)

此外,apscheduler 提供了许多调度程序,如下所示。

  • BlockingScheduler:当调度程序是你的进程中唯一运行的东西时使用

  • BackgroundScheduler:当您不使用以下任何框架时使用,并且希望调度程序在您的应用程序内在后台运行

  • AsyncIOScheduler:如果您的应用程序使用 asyncio 模块,请使用

  • GeventScheduler:如果您的应用程序使用 gevent,请使用

  • TornadoScheduler:如果您正在构建 Tornado 应用程序,请使用

  • TwistedScheduler:在构建 Twisted 应用程序时使用

  • QtScheduler:如果您正在构建 Qt 应用程序,请使用


Ana*_*nay 5

一段时间后我遇到了类似的问题.可能是http://cronus.readthedocs.org可能有帮助吗?

对于v0.2,以下代码段有效

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec
Run Code Online (Sandbox Code Playgroud)


Ada*_*hes 5

如果漂移不是问题

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)
    
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

Run Code Online (Sandbox Code Playgroud)

其中异步输出。

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018
Run Code Online (Sandbox Code Playgroud)

如果正在运行的任务需要相当长的时间,则间隔变为 2 秒 + 任务时间,因此如果您需要精确调度,那么这不适合您。

请注意该daemon=True标志意味着该线程不会阻止应用程序关闭。例如,pytest在运行测试等待此头停止后无限期挂起的问题。

  • 不,它只打印第一个日期时间,然后停止...... (2认同)
  • @eraoul 我知道,我的回答确实提到了这一点。我已加粗该部分,以便更好地突出。 (2认同)