Python线程:我可以同时在两个threading.Event()上睡觉吗?

kdt*_*kdt 20 python multithreading

如果我有两个threading.Event()对象,并且希望睡眠直到其中任何一个被设置,那么在python中有一种有效的方法吗?显然,我可以使用轮询/超时做一些事情,但我想让线程一直处于休眠状态,直到设置为一个,类似于select文件描述符的使用方式.

那么在下面的实现中,有效的非轮询实现wait_for_either会是什么样的呢?

a = threading.Event()
b = threading.Event()

wait_for_either(a, b)
Run Code Online (Sandbox Code Playgroud)

Cla*_*diu 20

这是一个非轮询的非过度线程解决方案:修改现有的Events以在它们发生更改时触发回调,并处理在该回调中设置新事件:

import threading

def or_set(self):
    self._set()
    self.changed()

def or_clear(self):
    self._clear()
    self.changed()

def orify(e, changed_callback):
    e._set = e.set
    e._clear = e.clear
    e.changed = changed_callback
    e.set = lambda: or_set(e)
    e.clear = lambda: or_clear(e)

def OrEvent(*events):
    or_event = threading.Event()
    def changed():
        bools = [e.is_set() for e in events]
        if any(bools):
            or_event.set()
        else:
            or_event.clear()
    for e in events:
        orify(e, changed)
    changed()
    return or_event
Run Code Online (Sandbox Code Playgroud)

样品用法:

def wait_on(name, e):
    print "Waiting on %s..." % (name,)
    e.wait()
    print "%s fired!" % (name,)

def test():
    import time

    e1 = threading.Event()
    e2 = threading.Event()

    or_e = OrEvent(e1, e2)

    threading.Thread(target=wait_on, args=('e1', e1)).start()
    time.sleep(0.05)
    threading.Thread(target=wait_on, args=('e2', e2)).start()
    time.sleep(0.05)
    threading.Thread(target=wait_on, args=('or_e', or_e)).start()
    time.sleep(0.05)

    print "Firing e1 in 2 seconds..."
    time.sleep(2)
    e1.set()
    time.sleep(0.05)

    print "Firing e2 in 2 seconds..."
    time.sleep(2)
    e2.set()
    time.sleep(0.05)
Run Code Online (Sandbox Code Playgroud)

结果是:

Waiting on e1...
Waiting on e2...
Waiting on or_e...
Firing e1 in 2 seconds...
e1 fired!or_e fired!

Firing e2 in 2 seconds...
e2 fired!
Run Code Online (Sandbox Code Playgroud)

这应该是线程安全的.欢迎任何评论.

编辑:哦,这是你的wait_for_either功能,虽然我编写代码的方式,最好制作和传递一个or_event.请注意,or_event不应手动设置或清除.

def wait_for_either(e1, e2):
    OrEvent(e1, e2).wait()
Run Code Online (Sandbox Code Playgroud)

  • 这很好!但是,我看到一个问题:如果你两次"同步"同一个事件,那么每当你设置或清除它时,你都会得到一个无限循环. (2认同)
  • 这真的很巧妙,但依赖于修改 Event 的私有实现,因此如果在未来版本中实现更改,则会严重中断。 (2认同)
  • 不仅如此,当我尝试用 e2 来 orify 单个事件 e1,然后再用事件 e3 来 orify 它时,它就被破坏了。或者简单地通过事件 e2 进行两次口述。这是因为您修改了现有事件。 (2认同)

Bre*_*dan 10

我认为标准库为这个问题提供了一个非常规范的解决方案,我在这个问题中没有看到这个问题:条件变量。您让主线程等待条件变量,并在每次收到通知时轮询事件集。只有在其中一个事件更新时才会通知它,因此没有浪费的轮询。这是一个 Python 3 示例:

from threading import Thread, Event, Condition
from time import sleep
from random import random

event1 = Event()
event2 = Event()
cond = Condition()

def thread_func(event, i):
    delay = random()
    print("Thread {} sleeping for {}s".format(i, delay))
    sleep(delay)

    event.set()
    with cond:
        cond.notify()

    print("Thread {} done".format(i))

with cond:
    Thread(target=thread_func, args=(event1, 1)).start()
    Thread(target=thread_func, args=(event2, 2)).start()
    print("Threads started")

    while not (event1.is_set() or event2.is_set()):
        print("Entering cond.wait")
        cond.wait()
        print("Exited cond.wait ({}, {})".format(event1.is_set(), event2.is_set()))

    print("Main thread done")
Run Code Online (Sandbox Code Playgroud)

示例输出:

Thread 1 sleeping for 0.31569427100177794s
Thread 2 sleeping for 0.486548134317051s
Threads started
Entering cond.wait
Thread 1 done
Exited cond.wait (True, False)
Main thread done
Thread 2 done
Run Code Online (Sandbox Code Playgroud)

请注意,没有额外的线程或不必要的轮询,您可以等待任意谓词变为真(例如,对于要设置的任何特定事件子集)。还有一个模式的wait_for包装器while (pred): cond.wait(),它可以使您的代码更易于阅读。


Iul*_*urt 5

一种解决方案(使用轮询)是对Event循环中的每个对象进行顺序等待

def wait_for_either(a, b):
    while True:
        if a.wait(tunable_timeout):
            break
        if b.wait(tunable_timeout):
            break
Run Code Online (Sandbox Code Playgroud)

我认为,如果将超时调整得足够好,结果将是可以的。


我能想到的最好的非轮询方法是在不同的线程中等待每个线程,并Event在主线程中设置一个共享对象,您将在此之后等待。

def repeat_trigger(waiter, trigger):
    waiter.wait()
    trigger.set()

def wait_for_either(a, b):
    trigger = threading.Event()
    ta = threading.Thread(target=repeat_trigger, args=(a, trigger))
    tb = threading.Thread(target=repeat_trigger, args=(b, trigger))
    ta.start()
    tb.start()
    # Now do the union waiting
    trigger.wait()
Run Code Online (Sandbox Code Playgroud)

非常有趣,因此我编写了先前解决方案的OOP版本:

class EventUnion(object):
    """Register Event objects and wait for release when any of them is set"""
    def __init__(self, ev_list=None):
        self._trigger = Event()
        if ev_list:
            # Make a list of threads, one for each Event
            self._t_list = [
                Thread(target=self._triggerer, args=(ev, ))
                for ev in ev_list
            ]
        else:
            self._t_list = []

    def register(self, ev):
        """Register a new Event"""
        self._t_list.append(Thread(target=self._triggerer, args=(ev, )))

    def wait(self, timeout=None):
        """Start waiting until any one of the registred Event is set"""
        # Start all the threads
        map(lambda t: t.start(), self._t_list)
        # Now do the union waiting
        return self._trigger.wait(timeout)

    def _triggerer(self, ev):
        ev.wait()
        self._trigger.set()
Run Code Online (Sandbox Code Playgroud)