Joh*_*ard 129 python multithreading
有没有办法,例如,Hello World!每n秒打印一次?例如,程序将通过我拥有的任何代码,然后一旦它有5秒(有time.sleep())它将执行该代码.我会使用它来更新文件,而不是打印Hello World.
例如:
startrepeat("print('Hello World')", .01) # Repeats print('Hello World') ever .01 seconds
for i in range(5):
    print(i)
>> Hello World!
>> 0
>> 1
>> 2
>> Hello World!
>> 3
>> Hello World!
>> 4
Ale*_*lli 279
import threading
def printit():
  threading.Timer(5.0, printit).start()
  print "Hello, World!"
printit()
# continue with the rest of your code
Mes*_*ion 107
我对这个主题的谦虚态度,对Alex Martelli的回答的概括,以及start()和stop()控制:
from threading import Timer
class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()
    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)
    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True
    def stop(self):
        self._timer.cancel()
        self.is_running = False
用法:
from time import sleep
def hello(name):
    print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
特征:
start()stop()即使定时器已经启动/停止,也可以安全地多次呼叫interval随时更改,它将在下次运行后生效.同样的args,kwargs甚至是function!Yan*_*Yin 25
保存自己的精神分裂症情节并使用Advanced Python调度程序:http: //pythonhosted.org/APScheduler
代码很简单:
from apscheduler.scheduler import Scheduler
sched = Scheduler()
sched.start()
def some_job():
    print "Every 10 seconds"
sched.add_interval_job(some_job, seconds = 10)
....
sched.shutdown()
ava*_*riu 21
def update():
    import time
    while True:
        print 'Hello World!'
        time.sleep(5)
那将作为一个功能运行.将while True:使它永远运行下去.如果需要,您可以随时将其从功能中取出.
Six*_*Six 15
这是一个与APScheduler 3.00+兼容的简单示例:
# note that there are many other schedulers available
from apscheduler.schedulers.background import BackgroundScheduler
sched = BackgroundScheduler()
def some_job():
    print('Every 10 seconds')
# seconds can be replaced with minutes, hours, or days
sched.add_job(some_job, 'interval', seconds=10)
sched.start()
...
sched.shutdown()
或者,您可以使用以下内容.与许多替代方案不同,此计时器将每n秒精确执行所需的代码(无论代码执行的时间如何).因此,如果您无法承受任何漂移,这是一个很好的选择.
import time
from threading import Event, Thread
class RepeatedTimer:
    """Repeat `function` every `interval` seconds."""
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.start = time.time()
        self.event = Event()
        self.thread = Thread(target=self._target)
        self.thread.start()
    def _target(self):
        while not self.event.wait(self._time):
            self.function(*self.args, **self.kwargs)
    @property
    def _time(self):
        return self.interval - ((time.time() - self.start) % self.interval)
    def stop(self):
        self.event.set()
        self.thread.join()
# start timer
timer = RepeatedTimer(10, print, 'Hello world')
# stop timer
timer.stop()
这是一个每秒都不创建新线程的版本n:
from threading import Event, Thread
def call_repeatedly(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval): # the first call is in `interval` secs
            func(*args)
    Thread(target=loop).start()    
    return stopped.set
该事件用于停止重复:
cancel_future_calls = call_repeatedly(5, print, "Hello, World")
# do something else here...
cancel_future_calls() # stop future calls
| 归档时间: | 
 | 
| 查看次数: | 315769 次 | 
| 最近记录: |