kdt*_*kdt 20 python multithreading
如果我有两个threading.Event()对象,并且希望睡眠直到其中任何一个被设置,那么在python中有一种有效的方法吗?显然,我可以使用轮询/超时做一些事情,但我想让线程一直处于休眠状态,直到设置为一个,类似于select文件描述符的使用方式.
那么在下面的实现中,有效的非轮询实现wait_for_either会是什么样的呢?
a = threading.Event()
b = threading.Event()
wait_for_either(a, b)
Run Code Online (Sandbox Code Playgroud)
Cla*_*diu 20
这是一个非轮询的非过度线程解决方案:修改现有的Events以在它们发生更改时触发回调,并处理在该回调中设置新事件:
import threading
def or_set(self):
self._set()
self.changed()
def or_clear(self):
self._clear()
self.changed()
def orify(e, changed_callback):
e._set = e.set
e._clear = e.clear
e.changed = changed_callback
e.set = lambda: or_set(e)
e.clear = lambda: or_clear(e)
def OrEvent(*events):
or_event = threading.Event()
def changed():
bools = [e.is_set() for e in events]
if any(bools):
or_event.set()
else:
or_event.clear()
for e in events:
orify(e, changed)
changed()
return or_event
Run Code Online (Sandbox Code Playgroud)
样品用法:
def wait_on(name, e):
print "Waiting on %s..." % (name,)
e.wait()
print "%s fired!" % (name,)
def test():
import time
e1 = threading.Event()
e2 = threading.Event()
or_e = OrEvent(e1, e2)
threading.Thread(target=wait_on, args=('e1', e1)).start()
time.sleep(0.05)
threading.Thread(target=wait_on, args=('e2', e2)).start()
time.sleep(0.05)
threading.Thread(target=wait_on, args=('or_e', or_e)).start()
time.sleep(0.05)
print "Firing e1 in 2 seconds..."
time.sleep(2)
e1.set()
time.sleep(0.05)
print "Firing e2 in 2 seconds..."
time.sleep(2)
e2.set()
time.sleep(0.05)
Run Code Online (Sandbox Code Playgroud)
结果是:
Waiting on e1...
Waiting on e2...
Waiting on or_e...
Firing e1 in 2 seconds...
e1 fired!or_e fired!
Firing e2 in 2 seconds...
e2 fired!
Run Code Online (Sandbox Code Playgroud)
这应该是线程安全的.欢迎任何评论.
编辑:哦,这是你的wait_for_either功能,虽然我编写代码的方式,最好制作和传递一个or_event.请注意,or_event不应手动设置或清除.
def wait_for_either(e1, e2):
OrEvent(e1, e2).wait()
Run Code Online (Sandbox Code Playgroud)
Bre*_*dan 10
我认为标准库为这个问题提供了一个非常规范的解决方案,我在这个问题中没有看到这个问题:条件变量。您让主线程等待条件变量,并在每次收到通知时轮询事件集。只有在其中一个事件更新时才会通知它,因此没有浪费的轮询。这是一个 Python 3 示例:
from threading import Thread, Event, Condition
from time import sleep
from random import random
event1 = Event()
event2 = Event()
cond = Condition()
def thread_func(event, i):
delay = random()
print("Thread {} sleeping for {}s".format(i, delay))
sleep(delay)
event.set()
with cond:
cond.notify()
print("Thread {} done".format(i))
with cond:
Thread(target=thread_func, args=(event1, 1)).start()
Thread(target=thread_func, args=(event2, 2)).start()
print("Threads started")
while not (event1.is_set() or event2.is_set()):
print("Entering cond.wait")
cond.wait()
print("Exited cond.wait ({}, {})".format(event1.is_set(), event2.is_set()))
print("Main thread done")
Run Code Online (Sandbox Code Playgroud)
示例输出:
Thread 1 sleeping for 0.31569427100177794s
Thread 2 sleeping for 0.486548134317051s
Threads started
Entering cond.wait
Thread 1 done
Exited cond.wait (True, False)
Main thread done
Thread 2 done
Run Code Online (Sandbox Code Playgroud)
请注意,没有额外的线程或不必要的轮询,您可以等待任意谓词变为真(例如,对于要设置的任何特定事件子集)。还有一个模式的wait_for包装器while (pred): cond.wait(),它可以使您的代码更易于阅读。
一种解决方案(使用轮询)是对Event循环中的每个对象进行顺序等待
def wait_for_either(a, b):
while True:
if a.wait(tunable_timeout):
break
if b.wait(tunable_timeout):
break
Run Code Online (Sandbox Code Playgroud)
我认为,如果将超时调整得足够好,结果将是可以的。
我能想到的最好的非轮询方法是在不同的线程中等待每个线程,并Event在主线程中设置一个共享对象,您将在此之后等待。
def repeat_trigger(waiter, trigger):
waiter.wait()
trigger.set()
def wait_for_either(a, b):
trigger = threading.Event()
ta = threading.Thread(target=repeat_trigger, args=(a, trigger))
tb = threading.Thread(target=repeat_trigger, args=(b, trigger))
ta.start()
tb.start()
# Now do the union waiting
trigger.wait()
Run Code Online (Sandbox Code Playgroud)
非常有趣,因此我编写了先前解决方案的OOP版本:
class EventUnion(object):
"""Register Event objects and wait for release when any of them is set"""
def __init__(self, ev_list=None):
self._trigger = Event()
if ev_list:
# Make a list of threads, one for each Event
self._t_list = [
Thread(target=self._triggerer, args=(ev, ))
for ev in ev_list
]
else:
self._t_list = []
def register(self, ev):
"""Register a new Event"""
self._t_list.append(Thread(target=self._triggerer, args=(ev, )))
def wait(self, timeout=None):
"""Start waiting until any one of the registred Event is set"""
# Start all the threads
map(lambda t: t.start(), self._t_list)
# Now do the union waiting
return self._trigger.wait(timeout)
def _triggerer(self, ev):
ev.wait()
self._trigger.set()
Run Code Online (Sandbox Code Playgroud)