13 python multithreading future wait threadpool
我有一个线程池中执行的任务,这些任务共享一个可重入的读写锁。如果执行完成,这些任务将返回期货。当该锁遇到争用时,重入的读写锁将在某种条件下等待。
我正在使用的库公开了一个wait_for_any方法,以从一组任务中检索一个或多个完成的期货。但是,即使一个或多个期货已经完成,在所有期货都完成之前,wait_for_any方法也将无法返回。此外,wait_for_any方法公开一个超时参数,如果设置该参数,该参数随后将被忽略。
我的问题是我在做错什么,可能导致这种wait_for_any方法阻塞?我是否了解Python的条件等待和错误通知的实现,这些构造是否会在Python中完全阻塞每个线程?
我正在使用的库称为Futurist,由OpenStack基金会维护。这里是我使用的相关类和方法的链接:GreenThreadPoolExecutor和waiters.wait_for_any
这是ReentrantReadWriteLock:
class ReentrantReadWriteLock(object):
def __init__(self):
self._read_lock = RLock()
self._write_lock = RLock()
self._condition = Condition
self._num_readers = 0
self._wants_write = False
def read_acquire(self, blocking=True):
int_lock = False
try:
if self._read_lock.acquire(blocking):
int_lock = True
LOG.warning("read internal lock acquired")
while self._wants_write:
LOG.warning("read wants write true")
if not blocking:
LOG.warning("read non blocking")
return False
LOG.warning("read wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("read acquired lock")
self._num_readers += 1
return True
LOG.warning("read internal lock failed")
return False
finally:
if int_lock:
self._read_lock.release()
def write_acquire(self, blocking=True):
int_lock = False
try:
if self._write_lock.acquire(blocking):
int_lock = True
LOG.warning("write internal lock acquired")
while self._num_readers > 0 or self._wants_write:
LOG.warning("write wants write true or num read")
if not blocking:
LOG.warning("write non blocking")
return False
LOG.warning("write wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("write acquired lock")
self._wants_write = True
return True
LOG.warning("write internal lock failed")
return False
finally:
if int_lock:
self._write_lock.release()
Run Code Online (Sandbox Code Playgroud)
要测试锁并对其进行不确定的阻塞,请执行以下操作:
def get_read(self, rrwlock):
return rrwlock.read_acquire()
def get_write(self, rrwlock):
return rrwlock.write_acquire()
def test():
self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
rrwlock = ReentrantReadWriteLock()
futures = []
futures.append(self._threadpool.submit(self.get_read, rrwlock))
futures.append(self._threadpool.submit(self.get_write, rrwlock))
# Get the results and verify only one of the calls succeeded
# assert that the other call is still pending
results = waiters.wait_for_any(futures)
self.assertTrue(results[0].pop().result)
self.assertEqual(1, len(results[1]))
Run Code Online (Sandbox Code Playgroud)
在该示例中,results = waiters.wait_for_any(futures)
块的执行是无限期的。这使我彻底困惑。我希望有人可以为我提供这种行为的解释。
Update 2019-10-16 18:55:00 UTC:主线程的阻止不仅限于此ReentrantReadWriteLock实现,而且在使用诸如readerwriterLock之类的库时也会发生。
Update 2019-10-17 08:15:00 UTC 我已将其作为错误报告提交给启动板上的未来派维护者,因为我认为这种行为是不正确的:Launchpad错误报告
Update 2019-10-20 09:02:00 UTC之后, 我观察到未来派库进度中的哪个调用被阻止:waiter.event.wait(timeout) 一个类似的问题似乎已提交给Python 3.3和3.4,并且自已关闭:已关闭的问题
Update 2019-10-21 09:06:00 UTC 已提交对未来派库的修补程序,以尝试解决此问题。
更新2019-10-22 08:03:00 UTC
提交的修补程序无法解决问题。在waiter.event.wait(timeout)
调用waiter.acquire()时,在Python threading.py wait函数中查找调用块时。
Update 2019-10-23 07:17:00 UTC 我创建了一个小型存储库,演示了使用本地ThreadPoolExecutor和Futures可以实现的功能。我开始怀疑这是GIL在CPython中造成的限制。以下代码使用与上述相同的锁演示了演示的操作:
from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor
def read_lock(lock):
lock.read_acquire()
def write_lock(lock):
lock.write_acquire()
def main():
local_lock = ReentrantReadWriteLock()
with ThreadPoolExecutor(max_workers=2) as executor:
# First task will submit fine
future = executor.submit(read_lock, local_lock)
# Second one will block indefinitely
future2 = executor.submit(write_lock, local_lock)
Run Code Online (Sandbox Code Playgroud)
Update 2019-10-31 07:36:00 UTC可更新 读写锁定已更新,因此它可与Python 2.7一起使用,并且是github上演示存储库中所写内容的最新版本。
此外,我们发现2019年10月23日描述的本机线程池演示不起作用,因为它与最后一条语句一起使用
future2 = executor.submit(write_lock, local_lock)
Run Code Online (Sandbox Code Playgroud)
__exit__
线程池的方法将被调用。自然,此方法尝试清除所有当前正在运行的线程,这由于保持锁而无法实现。该示例已使用spin_for_any示例进行了更新:
futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))
# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
for f in futures:
if f.done():
futures.remove(f)
f.result()
print("Future done")
Run Code Online (Sandbox Code Playgroud)
这个本地Python并发spin_for_any示例完全按预期工作。
在你的ReentrantReadWriteLock
课堂上,尝试改变
self._condition = Condition()
Run Code Online (Sandbox Code Playgroud)