在Python中执行定期操作

Bru*_*uce 126 python

我在Windows上工作.我想每10秒执行一次函数foo().

我该怎么做呢?

kev*_*kev 172

最后foo(),创建一个在10秒后Timer调用foo()自己的东西.
因为,Timer创建一个新thread的来电foo().
你可以做其他事情而不被阻止.

import time, threading
def foo():
    print(time.ctime())
    threading.Timer(10, foo).start()

foo()

#output:
#Thu Dec 22 14:46:08 2011
#Thu Dec 22 14:46:18 2011
#Thu Dec 22 14:46:28 2011
#Thu Dec 22 14:46:38 2011
Run Code Online (Sandbox Code Playgroud)

  • 这里要注意的一件事是开始时间"漂移".我刚刚进行了测试,我的时间在大约33次迭代中漂移了+ 0.05s.我正在进行1秒的民意调查,这意味着在不到一分钟的时间内就会有20%的漂移.您可以通过在函数开头而不是在结尾调用`threading.Timer`来_reduce_漂移,但前提是它是导致漂移的函数持续时间,而不是定时器不可靠.减少漂移的最佳方法是只在需要的时间内休眠直到下一个预期的运行时间.我将添加一个例子作为另一个答案. (49认同)
  • 这里的内存使用情况如何?感觉就像一个无限的递归呼叫,是吗? (7认同)
  • 嘿,这很酷 (2认同)
  • 这也有每个周期实例化一个新对象(在一个新线程中!)的开销。我找不到这个问题的真正好的解决方案,但我已经考虑了一下,并很快在下面发布了一个使用生成器的答案 (2认同)
  • 这种解决方案有点脆弱。有效负载(在这种情况下为print)引发的任何未捕获的异常(例如IOError)将导致整个计划的终止。我希望有一个解决方案,该解决方案可以更妥善地处理此类问题,并在解决异常原因(例如,满磁盘)后恢复到原始行为。 (2认同)
  • 随着定时器线程的数量不断增长,您将如何杀死它们? (2认同)

Mic*_*son 104

只需睡10秒或使用threading.Timer(10,foo)将导致开始时间漂移.(您可能不关心这一点,或者它可能是问题的重要来源,具体取决于您的具体情况.)可能有两个原因 - 您的线程唤醒时间或功能执行时间不准确.

您可以在本文末尾看到一些结果,但首先是一个如何修复它的示例.您需要跟踪下次调用函数的时间,而不是实际调用函数的时间并考虑差异.

这是一个略有漂移的版本:

import datetime, threading

def foo():
    print datetime.datetime.now()
    threading.Timer(1, foo).start()

foo()
Run Code Online (Sandbox Code Playgroud)

它的输出如下:

2013-08-12 13:05:36.483580
2013-08-12 13:05:37.484931
2013-08-12 13:05:38.485505
2013-08-12 13:05:39.486945
2013-08-12 13:05:40.488386
2013-08-12 13:05:41.489819
2013-08-12 13:05:42.491202
2013-08-12 13:05:43.492486
2013-08-12 13:05:44.493865
2013-08-12 13:05:45.494987
2013-08-12 13:05:46.496479
2013-08-12 13:05:47.497824
2013-08-12 13:05:48.499286
2013-08-12 13:05:49.500232
Run Code Online (Sandbox Code Playgroud)

您可以看到亚秒计数不断增加,因此,开始时间是"漂移".

这是正确解释漂移的代码:

import datetime, threading, time

next_call = time.time()

def foo():
  global next_call
  print datetime.datetime.now()
  next_call = next_call+1
  threading.Timer( next_call - time.time(), foo ).start()

foo()
Run Code Online (Sandbox Code Playgroud)

它的输出如下:

2013-08-12 13:21:45.292565
2013-08-12 13:21:47.293000
2013-08-12 13:21:48.293939
2013-08-12 13:21:49.293327
2013-08-12 13:21:50.293883
2013-08-12 13:21:51.293070
2013-08-12 13:21:52.293393
Run Code Online (Sandbox Code Playgroud)

在这里,您可以看到亚秒级不再有任何增加.

如果您的事件经常发生,您可能希望在单个线程中运行计时器,而不是为每个事件启动新线程.考虑漂移,这看起来像:

import datetime, threading, time

def foo():
    next_call = time.time()
    while True:
        print datetime.datetime.now()
        next_call = next_call+1;
        time.sleep(next_call - time.time())

timerThread = threading.Thread(target=foo)
timerThread.start()
Run Code Online (Sandbox Code Playgroud)

但是,您的应用程序将无法正常退出,您需要终止计时器线程.如果要在应用程序完成后正常退出,而不是手动终止该线程,则应该使用

timerThread = threading.Thread(target=foo)
timerThread.daemon = True
timerThread.start()
Run Code Online (Sandbox Code Playgroud)

  • 为每个调用创建一个线程似乎很浪费.[你可以在一个线程中完成](http://stackoverflow.com/a/20169930/4279) (3认同)
  • @KiaMorot 在处理工作需要更长的时间的情况下,这并不是一个很好的解决方案。在这种情况下,使用 `max(0, next_call - time.time())` 作为睡眠的参数,那么你至少会立即重新启动。 (2认同)

wat*_*nic 40

很惊讶没有找到使用发电机进行计时的解决方案.我只是为了自己的目的设计了这个.

这个解决方案:单线程,每个周期没有对象实例化,使用生成器多次,在时间上稳定下降到time模块的精度(不同于我从堆栈交换尝试的几个解决方案).

注意:对于Python 2.x,请next(g)在下面替换g.next().

import time

def do_every(period,f,*args):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count*period - time.time(),0)
    g = g_tick()
    while True:
        time.sleep(next(g))
        f(*args)

def hello(s):
    print('hello {} ({:.4f})'.format(s,time.time()))
    time.sleep(.3)

do_every(1,hello,'foo')
Run Code Online (Sandbox Code Playgroud)

结果,例如:

hello foo (1421705487.5811)
hello foo (1421705488.5811)
hello foo (1421705489.5809)
hello foo (1421705490.5830)
hello foo (1421705491.5803)
hello foo (1421705492.5808)
hello foo (1421705493.5811)
hello foo (1421705494.5811)
hello foo (1421705495.5810)
hello foo (1421705496.5811)
hello foo (1421705497.5810)
hello foo (1421705498.5810)
hello foo (1421705499.5809)
hello foo (1421705500.5811)
hello foo (1421705501.5811)
hello foo (1421705502.5811)
hello foo (1421705503.5810)
Run Code Online (Sandbox Code Playgroud)

请注意,此示例包括模拟cpu在每个周期执行其他操作.3秒.如果你每次都改变它是无关紧要的.如果被调用的函数花费的时间超过指定的时间,则yield行中的最大值用于防止sleep负数.在这种情况下,它将立即执行并在下一次执行的时间弥补丢失的时间.

  • 在Python 3.x下,time.sleep(g.next())不起作用.切换到time.sleep(next(g))就可以了. (3认同)
  • 我正在尝试在 Pi 上获取真实世界测量的精确样本。我是一位信号处理老手。这是正确的解决方案。 (3认同)
  • 如此优雅,以至于我很惊讶没有内置程序提供这种行为。感谢分享! (2认同)

Ray*_*ger 11

也许sched模块将满足您的需求.

或者,考虑使用Timer对象.


wim*_*wim 5

这将在每次通话之间插入10秒钟的睡眠foo(),这大约是您在呼叫快速完成时所要求的.

import time
while True:
  foo()
  time.sleep(10)
Run Code Online (Sandbox Code Playgroud)

编辑:foo()在后台线程中调用 时执行其他操作

import time
import sys
import threading

def foo():
  sys.stdout.write('({}) foo\n'.format(time.ctime()))

def foo_target():
  while True:
    foo()
    time.sleep(10)

t = threading.Thread(target=foo_target)
t.daemon = True
t.start()
raw_input('do other things...')
Run Code Online (Sandbox Code Playgroud)


Ric*_*ood 5

这是使用Thread类的一个很好的实现:http://g-off.net/software/a-python-repeatable-threadingtimer-class

下面的代码更快更脏:

from threading import Timer
from time import sleep

def hello():
    print "hello, world"
    t = Timer(3,hello)
    t.start()

t = Timer(3, hello)
t.start() # after 3 seconds, "hello, world" will be printed

# timer will wake up ever 3 seconds, while we do something else
while True:
    print "do something else"
    sleep(10)
Run Code Online (Sandbox Code Playgroud)


jco*_*ado 5

您可以在其他线程中执行任务.threading.Timer如果要执行任务,例如,只要回调返回True(这实际上是glib.timeout_add提供的,但您可能没有在Windows中安装它),或者直到执行任务,您将允许您在一段时间过后执行一次给定的回调你取消它,你可以使用这个代码:

import logging, threading, functools
import time

logging.basicConfig(level=logging.NOTSET,
                    format='%(threadName)s %(message)s')

class PeriodicTimer(object):
    def __init__(self, interval, callback):
        self.interval = interval

        @functools.wraps(callback)
        def wrapper(*args, **kwargs):
            result = callback(*args, **kwargs)
            if result:
                self.thread = threading.Timer(self.interval,
                                              self.callback)
                self.thread.start()

        self.callback = wrapper

    def start(self):
        self.thread = threading.Timer(self.interval, self.callback)
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def foo():
    logging.info('Doing some work...')
    return True

timer = PeriodicTimer(1, foo)
timer.start()

for i in range(2):
    time.sleep(2)
    logging.info('Doing some other work...')

timer.cancel()
Run Code Online (Sandbox Code Playgroud)

示例输出:

Thread-1 Doing some work...
Thread-2 Doing some work...
MainThread Doing some other work...
Thread-3 Doing some work...
Thread-4 Doing some work...
MainThread Doing some other work...
Run Code Online (Sandbox Code Playgroud)

注意:每次执行间隔时都不会执行回调.间隔是线程在上次完成的回调和下一次调用之间等待的时间.


Car*_*arl 5

这是一个简单的单线程睡眠版本,它会漂移,但会在检测到漂移时尝试自动校正.

注意:这仅在满足以下3个合理假设时才有效:

  1. 时间段远大于正在执行的功能的执行时间
  2. 正在执行的功能在每次呼叫时花费的时间大致相同
  3. 呼叫之间的漂移量小于一秒

-

from datetime import timedelta
from datetime import datetime

def exec_every_n_seconds(n,f):
    first_called=datetime.now()
    f()
    num_calls=1
    drift=timedelta()
    time_period=timedelta(seconds=n)
    while 1:
        time.sleep(n-drift.microseconds/1000000.0)
        current_time = datetime.now()
        f()
        num_calls += 1
        difference = current_time - first_called
        drift = difference - time_period* num_calls
        print "drift=",drift
Run Code Online (Sandbox Code Playgroud)