在线程内调用condition.wait()导致任何将来的检索都在主线程上阻塞

13 python multithreading future wait threadpool

我有一个线程池中执行的任务,这些任务共享一个可重入的读写锁。如果执行完成,这些任务将返回期货。当该锁遇到争用时,重入的读写锁将在某种条件下等待。

我正在使用的库公开了一个wait_for_any方法,以从一组任务中检索一个或多个完成的期货。但是,即使一个或多个期货已经完成,在所有期货都完成之前,wait_for_any方法也将无法返回。此外,wait_for_any方法公开一个超时参数,如果设置该参数,该参数随后将被忽略。

我的问题是我在做错什么,可能导致这种wait_for_any方法阻塞?我是否了解Python的条件等待和错误通知的实现,这些构造是否会在Python中完全阻塞每个线程?

我正在使用的库称为Futurist,由OpenStack基金会维护。这里是我使用的相关类和方法的链接:GreenThreadPoolExecutorwaiters.wait_for_any

这是ReentrantReadWriteLock:

class ReentrantReadWriteLock(object):
    def __init__(self):

        self._read_lock = RLock()
        self._write_lock = RLock()
        self._condition = Condition
        self._num_readers = 0
        self._wants_write = False

    def read_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._read_lock.acquire(blocking):
                int_lock = True
                LOG.warning("read internal lock acquired")
                while self._wants_write:
                    LOG.warning("read wants write true")
                    if not blocking:
                        LOG.warning("read non blocking")
                        return False
                    LOG.warning("read wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("read acquired lock")
                self._num_readers += 1
                return True
            LOG.warning("read internal lock failed")
            return False
        finally:
            if int_lock:
                 self._read_lock.release()

    def write_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._write_lock.acquire(blocking):
                int_lock = True
                LOG.warning("write internal lock acquired")
                while self._num_readers > 0 or self._wants_write:
                    LOG.warning("write wants write true or num read")
                    if not blocking:
                        LOG.warning("write non blocking")
                        return False
                    LOG.warning("write wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("write acquired lock")
                self._wants_write = True
                return True
            LOG.warning("write internal lock failed")
            return False
        finally:
            if int_lock:
                self._write_lock.release()
Run Code Online (Sandbox Code Playgroud)

要测试锁并对其进行不确定的阻塞,请执行以下操作:

def get_read(self, rrwlock):
    return rrwlock.read_acquire()

def get_write(self, rrwlock):
    return rrwlock.write_acquire()

def test():
    self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
    rrwlock = ReentrantReadWriteLock()
    futures = []
    futures.append(self._threadpool.submit(self.get_read, rrwlock))
    futures.append(self._threadpool.submit(self.get_write, rrwlock))

    # Get the results and verify only one of the calls succeeded
    # assert that the other call is still pending
    results = waiters.wait_for_any(futures)
    self.assertTrue(results[0].pop().result)
    self.assertEqual(1, len(results[1]))
Run Code Online (Sandbox Code Playgroud)

在该示例中,results = waiters.wait_for_any(futures)块的执行是无限期的。这使我彻底困惑。我希望有人可以为我提供这种行为的解释。

Update 2019-10-16 18:55:00 UTC:主线程的阻止不仅限于此ReentrantReadWriteLock实现,而且在使用诸如readerwriterLock之类的库时也会发生。

Update 2019-10-17 08:15:00 UTC 我已将其作为错误报告提交给启动板上的未来派维护者,因为我认为这种行为是不正确的:Launchpad错误报告

Update 2019-10-20 09:02:00 UTC之后, 我观察到未来派库进度中的哪个调用被阻止:waiter.event.wait(timeout) 一个类似的问题似乎已提交给Python 3.3和3.4,并且自已关闭:已关闭的问题

Update 2019-10-21 09:06:00 UTC 已提交对未来派库的修补程序,以尝试解决此问题

更新2019-10-22 08:03:00 UTC 提交的修补程序无法解决问题。在waiter.event.wait(timeout)调用waiter.acquire()时,在Python threading.py wait函数中查找调用块时。

Update 2019-10-23 07:17:00 UTC 我创建了一个小型存储库,演示了使用本地ThreadPoolExecutor和Futures可以实现的功能。我开始怀疑这是GIL在CPython中造成的限制。以下代码使用与上述相同的锁演示了演示的操作:

from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor

def read_lock(lock):
    lock.read_acquire()

def write_lock(lock):
    lock.write_acquire()

def main():
    local_lock = ReentrantReadWriteLock()
    with ThreadPoolExecutor(max_workers=2) as executor:
        # First task will submit fine
        future = executor.submit(read_lock, local_lock)
        # Second one will block indefinitely
        future2 = executor.submit(write_lock, local_lock)
Run Code Online (Sandbox Code Playgroud)

Update 2019-10-31 07:36:00 UTC可更新 读写锁定已更新,因此它可与Python 2.7一起使用,并且是github上演示存储库中所写内容的最新版本

此外,我们发现2019年10月23日描述的本机线程池演示不起作用,因为它与最后一条语句一起使用

future2 = executor.submit(write_lock, local_lock)
Run Code Online (Sandbox Code Playgroud)

__exit__线程池的方法将被调用。自然,此方法尝试清除所有当前正在运行的线程,这由于保持锁而无法实现。该示例已使用spin_for_any示例进行了更新:

futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))

# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
    for f in futures:
        if f.done():
            futures.remove(f)
            f.result()
            print("Future done")
Run Code Online (Sandbox Code Playgroud)

这个本地Python并发spin_for_any示例完全按预期工作。

Var*_*mar 2

在你的ReentrantReadWriteLock课堂上,尝试改变

self._condition = Condition()
Run Code Online (Sandbox Code Playgroud)