我想永远每60秒在Python中重复执行一个函数(就像目标C中的NSTimer一样).这段代码将作为守护进程运行,实际上就像使用cron每分钟调用python脚本一样,但不需要用户设置.
在这个关于用Python实现的cron的问题中,解决方案似乎实际上只是sleep() x秒.我不需要这样的高级功能,所以也许这样的东西可行
while True:
# Code executed here
time.sleep(60)
Run Code Online (Sandbox Code Playgroud)
这段代码有可预见的问题吗?
nos*_*klo 208
使用sched模块,它实现了一个通用的事件调度程序.
import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc):
print "Doing stuff..."
# do your stuff
s.enter(60, 1, do_something, (sc,))
s.enter(60, 1, do_something, (s,))
s.run()
Run Code Online (Sandbox Code Playgroud)
Dav*_*ove 154
只需将时间循环锁定到系统时钟即可.简单.
import time
starttime=time.time()
while True:
print "tick"
time.sleep(60.0 - ((time.time() - starttime) % 60.0))
Run Code Online (Sandbox Code Playgroud)
Aar*_*paa 67
您可能想要考虑Twisted,它是一个实现Reactor Pattern的Python网络库.
from twisted.internet import task, reactor
timeout = 60.0 # Sixty seconds
def doWork():
#do work here
pass
l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds
reactor.run()
Run Code Online (Sandbox Code Playgroud)
虽然"while True:sleep(60)"可能会工作Twisted可能已经实现了你最终需要的许多功能(如bobince所指出的守护进程,日志记录或异常处理),并且可能是一个更强大的解决方案
Mes*_*ion 56
如果你想要一种非阻塞方式来定期执行你的函数,而不是阻塞无限循环,我会使用一个线程计时器.这样,您的代码可以继续运行并执行其他任务,并且每隔n秒仍然会调用您的函数.我在很长的CPU /磁盘/网络密集型任务中使用这种技术来打印进度信息.
这是我在类似问题中发布的代码,包含start()和stop()控件:
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
Run Code Online (Sandbox Code Playgroud)
用法:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
Run Code Online (Sandbox Code Playgroud)
特征:
start()
stop()
即使定时器已经启动/停止,也可以安全地多次呼叫interval
随时更改,它将在下次运行后生效.同样的args
,kwargs
甚至是function
!Itx*_*aka 31
我认为更简单的方法是:
import time
def executeSomething():
#code here
time.sleep(60)
while True:
executeSomething()
Run Code Online (Sandbox Code Playgroud)
这样你的代码就会被执行,然后等待60秒然后它再次执行,等待,执行等......不需要复杂化的东西:D
era*_*oul 18
以下是MestreLion代码的更新,可以避免随着时间的推移而进行漫游:
import threading
import time
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
Run Code Online (Sandbox Code Playgroud)
Alf*_*lfe 18
import time, traceback
def every(delay, task):
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# in production code you might want to have this instead of course:
# logger.exception("Problem while executing repetitive task.")
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay
def foo():
print("foo", time.time())
every(5, foo)
Run Code Online (Sandbox Code Playgroud)
如果你想在不阻塞剩余代码的情况下执行此操作,可以使用它来让它在自己的线程中运行:
import threading
threading.Thread(target=lambda: every(5, foo)).start()
Run Code Online (Sandbox Code Playgroud)
该解决方案结合了其他解决方案中很少结合的几种功能:
threading.Timer
或其他)中出现任何问题时会很脆弱,这将终止链.即使问题的原因已经解决,也不会再发生进一步的执行.一个简单的循环和等待简单sleep()
相比,更加强大.next_time += delay
来代替.Lea*_*ple 13
我最终使用了schedule模块。API很好。
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
另一种灵活性解决方案是Apscheduler。
pip install apscheduler
Run Code Online (Sandbox Code Playgroud)
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
pass
sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds
sched.start()
Run Code Online (Sandbox Code Playgroud)
此外,apscheduler 提供了许多调度程序,如下所示。
BlockingScheduler:当调度程序是你的进程中唯一运行的东西时使用
BackgroundScheduler:当您不使用以下任何框架时使用,并且希望调度程序在您的应用程序内在后台运行
AsyncIOScheduler:如果您的应用程序使用 asyncio 模块,请使用
GeventScheduler:如果您的应用程序使用 gevent,请使用
TornadoScheduler:如果您正在构建 Tornado 应用程序,请使用
TwistedScheduler:在构建 Twisted 应用程序时使用
QtScheduler:如果您正在构建 Qt 应用程序,请使用
一段时间后我遇到了类似的问题.可能是http://cronus.readthedocs.org可能有帮助吗?
对于v0.2,以下代码段有效
import cronus.beat as beat
beat.set_rate(2) # 2 Hz
while beat.true():
# do some time consuming work here
beat.sleep() # total loop duration would be 0.5 sec
Run Code Online (Sandbox Code Playgroud)
如果漂移不是问题
import threading, time
def print_every_n_seconds(n=2):
while True:
print(time.ctime())
time.sleep(n)
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()
Run Code Online (Sandbox Code Playgroud)
其中异步输出。
#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018
Run Code Online (Sandbox Code Playgroud)
如果正在运行的任务需要相当长的时间,则间隔变为 2 秒 + 任务时间,因此如果您需要精确调度,那么这不适合您。
请注意该daemon=True
标志意味着该线程不会阻止应用程序关闭。例如,pytest
在运行测试等待此头停止后无限期挂起的问题。