每n秒运行一定的代码

Joh*_*ard 129 python multithreading

有没有办法,例如,Hello World!每n秒打印一次?例如,程序将通过我拥有的任何代码,然后一旦它有5秒(有time.sleep())它将执行该代码.我会使用它来更新文件,而不是打印Hello World.

例如:

startrepeat("print('Hello World')", .01) # Repeats print('Hello World') ever .01 seconds

for i in range(5):
    print(i)

>> Hello World!
>> 0
>> 1
>> 2
>> Hello World!
>> 3
>> Hello World!
>> 4
Run Code Online (Sandbox Code Playgroud)

Ale*_*lli 279

import threading

def printit():
  threading.Timer(5.0, printit).start()
  print "Hello, World!"

printit()

# continue with the rest of your code
Run Code Online (Sandbox Code Playgroud)

  • 确定,修复,请注意,您可能还想将`Timer`创建的线程设置为`守护进程`,以防您想通过完成主线程干净地中断程序 - 在这种情况下,您最好设置` t = threading.Timer`&c,然后`t.daemon = True`,然后``t.start()`就在`print"Hello,World!"之前. (26认同)
  • @YanKingYin:尝试运行它.它确实重复了.`printit`安排自己. (9认同)
  • 这不会最终达到递归函数限制吗? (9认同)
  • 这似乎根本不能解决问题......它不会每一秒重复. (6认同)
  • 我怎么阻止它? (5认同)
  • @AbelSurace 递归调用函数与启动新线程来运行函数不同。第一个增加了当前线程的递归深度,另一个没有。 (5认同)
  • @AbelSurace:它没有递归,因为它在另一个线程中启动_并且 `printit()` 一直持续到最后。你的世界是单线程的。欢迎来到多线程方面:-) (5认同)
  • 这也不起作用 - 它只会打印hello world然后执行我放在它之后的任何代码,而不重复自己. (4认同)
  • 这段代码完全错误,它不仅会导致“运行时错误:超出最大递归深度”,而且不会提供阻止它的方法。正如下面的一些答案,要走的路是使用调度程序 (3认同)
  • @AbelSurace 没有递归。但如果函数的运行时间超过了间隔,你可能会得到类似于叉子炸弹的东西。 (3认同)
  • @ Zonda,oops,忘了启动线程 - 现在编辑修复,对不起. (2认同)

Mes*_*ion 107

我对这个主题的谦虚态度,对Alex Martelli的回答的概括,以及start()和stop()控制:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False
Run Code Online (Sandbox Code Playgroud)

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
Run Code Online (Sandbox Code Playgroud)

特征:

  • 仅限标准库,无外部依赖项
  • start()stop()即使定时器已经启动/停止,也可以安全地多次呼叫
  • 要调用的函数可以有位置和命名参数
  • 您可以interval随时更改,它将在下次运行后生效.同样的args,kwargs甚至是function!

  • 这很好,但请注意,它不会每n秒运行一次.相反,它运行一个n秒计时器,然后快速启动另一个n秒计时器......但是在下一个计时器启动之前有一个延迟.所以这并不是每n秒发射一次; 相反,它漂移,在工作之间略微超过n秒.运行作业以打印当前时间的1秒计时器产生:开始... 16:07:42.017682 16:07:43.023215 16:07:44.023626等 (5认同)
  • 是的,@ eraoul,它确实漂移,因为没有直接的方法来使用Python的标准库设置真正的*重复*计时器,所以我"链接"几个一次性计时器来模拟它.然而,漂移在第一次运行时为5ms,在下一次运行时为0.4ms,因此需要大约200到2500次运行才能漂移一整秒,这对您来说可能是也可能不重要.为了在数千次运行中获得更高的精度,可以根据RTC计算下一次运行,或者使用外部工具,例如`cron` (3认同)
  • @JossieCalderon:`sleep()`函数位于Python标准库的`time`模块中,除了`import`之外不需要任何额外的代码来使用它。但请注意,这是一次性阻塞调用,而不是 OP 请求的重复多线程计时器。基本上,“sleep()”只是一个暂停,而不是一个计时器。 (2认同)

Yan*_*Yin 25

保存自己的精神分裂症情节并使用Advanced Python调度程序:http: //pythonhosted.org/APScheduler

代码很简单:

from apscheduler.scheduler import Scheduler

sched = Scheduler()
sched.start()

def some_job():
    print "Every 10 seconds"

sched.add_interval_job(some_job, seconds = 10)

....
sched.shutdown()
Run Code Online (Sandbox Code Playgroud)

  • 任何看过这个答案的人(2014年11月)都应该意识到这一点,虽然是一个好的答案,但都是错的.以上评论对此进行了讨论.要在当前版本中添加作业,代码将显示为"sched.add_job(some_job,'interval',seconds = 10)`.看一下[文档](http://apscheduler.readthedocs.org/en/3.0/#) (11认同)
  • 首先,子模块称为"调度程序",带有"s".那里没有班级调度员.也许BackgroundScheduler?无论如何,这个答案是不完整的,不起作用. (4认同)
  • 已经有一段时间了,我想我粘贴了网络用户手册中的代码.上面的代码现在已得到纠正(仍未经过测试,但它来自我自己的工作代码,我不断使用它).PS:也许我们正在寻找不同的版本/模块?我确定我的行是"来自apscheduler.scheduler import Scheduler",大写字母S而不是复数. (2认同)
  • @MadsSkjern:我看到[2.1分支有'apscheduler.scheduler`(没有's`)模块](https://bitbucket.org/agronholm/apscheduler/src/11c5c1e7b8c33205293168e044d90b7f2c84a21b/apscheduler/?at=2.1).当前分支3没有. (2认同)

ava*_*riu 21

def update():
    import time
    while True:
        print 'Hello World!'
        time.sleep(5)
Run Code Online (Sandbox Code Playgroud)

那将作为一个功能运行.将while True:使它永远运行下去.如果需要,您可以随时将其从功能中取出.

  • 那么这只是循环运行.您没有在问题中指明您在此期间正在做其他事情,所以我认为这就是您所需要的. (16认同)
  • 注意:这不会每5秒_quite_运行一次.它将每隔'5 + N`秒运行,其中`N`是执行`while True:`和`time.sleep(5)`之间的时间.虽然对于'print'Hellow World来说这是微不足道的!'与5秒相比,对于其他场景来说这可能是不平凡的.我应该注意使用线程也不会完全达到5秒,但它会更接近. (7认同)
  • 不起作用;它只是永远运行,而我不能做任何其他事情。 (4认同)
  • 我知道我晚了 9 年,但这不是一个有效的解决方案,他特别说“......程序将遍历我拥有的任何代码,然后一旦过了 5 秒......”。 (3认同)

Six*_*Six 15

这是一个与APScheduler 3.00+兼容的简单示例:

# note that there are many other schedulers available
from apscheduler.schedulers.background import BackgroundScheduler

sched = BackgroundScheduler()

def some_job():
    print('Every 10 seconds')

# seconds can be replaced with minutes, hours, or days
sched.add_job(some_job, 'interval', seconds=10)
sched.start()

...

sched.shutdown()
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用以下内容.与许多替代方案不同,此计时器将每n秒精确执行所需的代码(无论代码执行的时间如何).因此,如果您无法承受任何漂移,这是一个很好的选择.

import time
from threading import Event, Thread

class RepeatedTimer:

    """Repeat `function` every `interval` seconds."""

    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.start = time.time()
        self.event = Event()
        self.thread = Thread(target=self._target)
        self.thread.start()

    def _target(self):
        while not self.event.wait(self._time):
            self.function(*self.args, **self.kwargs)

    @property
    def _time(self):
        return self.interval - ((time.time() - self.start) % self.interval)

    def stop(self):
        self.event.set()
        self.thread.join()


# start timer
timer = RepeatedTimer(10, print, 'Hello world')

# stop timer
timer.stop()
Run Code Online (Sandbox Code Playgroud)


jfs*_*jfs 7

这是一个每秒都不创建新线程的版本n:

from threading import Event, Thread

def call_repeatedly(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval): # the first call is in `interval` secs
            func(*args)
    Thread(target=loop).start()    
    return stopped.set
Run Code Online (Sandbox Code Playgroud)

该事件用于停止重复:

cancel_future_calls = call_repeatedly(5, print, "Hello, World")
# do something else here...
cancel_future_calls() # stop future calls
Run Code Online (Sandbox Code Playgroud)

请参阅改进setInterval python的当前实现