And*_*rea 10 python multithreading setinterval
在JavaScript中,我习惯于能够调用稍后要执行的函数,就像这样
function foo() {
alert('bar');
}
setTimeout(foo, 1000);
Run Code Online (Sandbox Code Playgroud)
这不会阻止其他代码的执行.
我不知道如何在Python中实现类似的东西.我可以睡觉
import time
def foo():
print('bar')
time.sleep(1)
foo()
Run Code Online (Sandbox Code Playgroud)
但这会阻止其他代码的执行.(实际上在我的情况下阻塞Python本身并不是问题,但我无法对该方法进行单元测试.)
我知道线程是为不同步执行而设计的,但我想知道是否更容易,类似setTimeout或setInterval存在.
jfs*_*jfs 14
要在延迟后执行函数或使用事件循环(无线程)在给定的秒数内重复函数,您可以:
#!/usr/bin/env python
from Tkinter import Tk
def foo():
print("timer went off!")
def countdown(n, bps, root):
if n == 0:
root.destroy() # exit mainloop
else:
print(n)
root.after(1000 / bps, countdown, n - 1, bps, root) # repeat the call
root = Tk()
root.withdraw() # don't show the GUI window
root.after(4000, foo) # call foo() in 4 seconds
root.after(0, countdown, 10, 2, root) # show that we are alive
root.mainloop()
print("done")
Run Code Online (Sandbox Code Playgroud)
10
9
8
7
6
5
4
3
timer went off!
2
1
done
Run Code Online (Sandbox Code Playgroud)
#!/usr/bin/env python
from gi.repository import GObject, Gtk
def foo():
print("timer went off!")
def countdown(n): # note: a closure could have been used here instead
if n[0] == 0:
Gtk.main_quit() # exit mainloop
else:
print(n[0])
n[0] -= 1
return True # repeat the call
GObject.timeout_add(4000, foo) # call foo() in 4 seconds
GObject.timeout_add(500, countdown, [10])
Gtk.main()
print("done")
Run Code Online (Sandbox Code Playgroud)
10
9
8
7
6
5
4
timer went off!
3
2
1
done
Run Code Online (Sandbox Code Playgroud)
#!/usr/bin/env python
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
def foo():
print("timer went off!")
def countdown(n):
if n[0] == 0:
reactor.stop() # exit mainloop
else:
print(n[0])
n[0] -= 1
reactor.callLater(4, foo) # call foo() in 4 seconds
LoopingCall(countdown, [10]).start(.5) # repeat the call in .5 seconds
reactor.run()
print("done")
Run Code Online (Sandbox Code Playgroud)
10
9
8
7
6
5
4
3
timer went off!
2
1
done
Run Code Online (Sandbox Code Playgroud)
Python 3.4 为异步IO 引入了新的临时API - asyncio模块:
#!/usr/bin/env python3.4
import asyncio
def foo():
print("timer went off!")
def countdown(n):
if n[0] == 0:
loop.stop() # end loop.run_forever()
else:
print(n[0])
n[0] -= 1
def frange(start=0, stop=None, step=1):
while stop is None or start < stop:
yield start
start += step #NOTE: loss of precision over time
def call_every(loop, seconds, func, *args, now=True):
def repeat(now=True, times=frange(loop.time() + seconds, None, seconds)):
if now:
func(*args)
loop.call_at(next(times), repeat)
repeat(now=now)
loop = asyncio.get_event_loop()
loop.call_later(4, foo) # call foo() in 4 seconds
call_every(loop, 0.5, countdown, [10]) # repeat the call every .5 seconds
loop.run_forever()
loop.close()
print("done")
Run Code Online (Sandbox Code Playgroud)
10
9
8
7
6
5
4
3
timer went off!
2
1
done
Run Code Online (Sandbox Code Playgroud)
注意:这些方法之间的界面和行为略有不同.
from threading import Timer
from time import sleep
def foo():
print "timer went off!"
t = Timer(4, foo)
t.start()
for i in range(11):
print i
sleep(.5)
Run Code Online (Sandbox Code Playgroud)
如果你想重复一遍,这里有一个简单的解决方案:不使用Timer,只需使用Thread但传递一个有点像这样的函数:
def call_delay(delay, repetitions, func, *args, **kwargs):
for i in range(repetitions):
sleep(delay)
func(*args, *kwargs)
Run Code Online (Sandbox Code Playgroud)
这不会做无限循环,因为这可能导致一个不会死的线程和其他不愉快的行为,如果做得不对.更复杂的方法可能会使用Event基于方法的方法,就像这个方法一样.