Pon*_*dle 6 python multithreading garbage-collection memory-leaks thread-safety
我有一个函数永远不能同时从两个线程调用相同的值.为了实现这一点,我有一个为给定密钥defaultdict生成新的threading.Locks.因此,我的代码看起来类似于:
from collections import defaultdict
import threading
lock_dict = defaultdict(threading.Lock)
def f(x):
with lock_dict[x]:
print "Locked for value x"
Run Code Online (Sandbox Code Playgroud)
问题是我不知道如何在不再需要时从defaultdict 安全地删除锁.如果不这样做,我的程序会有一个内存泄漏,当f使用许多不同的x值调用时,它会变得明显.
我不能简单地del lock_dict[x]在f结束,因为在另一个线程等待锁的情况下,那么第二个线程将锁定那也没再与lock_dict [X]相关联的锁,因此两个线程可以同时最终调用f与x的相同值.
我会使用不同的方法:
fcond = threading.Condition()
fargs = set()
def f(x):
with fcond:
while x in fargs:
fcond.wait()
fargs.add(x) # this thread has exclusive rights to use `x`
# do useful stuff with x
# any other thread trying to call f(x) will
# block in the .wait above()
with fcond:
fargs.remove(x) # we're done with x
fcond.notify_all() # let blocked threads (if any) proceed
Run Code Online (Sandbox Code Playgroud)
条件有一个学习曲线,但一旦爬上它们就可以更容易地编写正确的线程安全,无竞赛代码.
@JimMischel在评论中询问orignal的使用是否defaultdict受到比赛的影响.好问题!
答案是 - 唉 - "你必须盯着你特定的Python实现".
假设CPython的实现:如果任何通过调用代码的defaultdict提供一个默认的调用Python代码,或C代码释放GIL(全局解释锁),然后加入2-(或更多)线程可能"同时"调用withlock_dict[x]用相同的x尚未在字典中,和:
x不在dict中,获得锁定,然后丢失其时间片(在设置xdict 之前).x不在dict中,并且还获得锁定.f(x).盯着3.4.0a4 +(当前的开发头)的源代码,defaultdict并且threading.Lock都是由不释放GIL的C代码实现的.我不记得早期版本是否做了或没,在不同的时间,实现全部或部分defaultdict或threading.LockPython编写的.
我建议的替代代码充满了用Python实现的东西(所有threading.Condition方法),但是在设计上是无竞争的 - 即使你使用Python的旧版本也用Python实现的集合(该集合只能在保护下访问)条件变量的锁定).
没有条件,这似乎要困难得多.在最初的方法中,我认为你需要保留一些想要使用的线程x,你需要一个锁来保护这些计数并保护字典.我为此提出的最好的代码是如此冗长,以至于把它放在上下文管理器中似乎是一种认可.要使用,请为每个需要它的函数创建一个参数锁定器:
farglocker = ArgLocker() # for function `f()`
Run Code Online (Sandbox Code Playgroud)
然后f()可以简单地编码身体:
def f(x):
with farglocker(x):
# only one thread at a time can run with argument `x`
Run Code Online (Sandbox Code Playgroud)
当然,条件方法也可以包含在上下文管理器中.这是代码:
import threading
class ArgLocker:
def __init__(self):
self.xs = dict() # maps x to (lock, count) pair
self.lock = threading.Lock()
def __call__(self, x):
return AllMine(self.xs, self.lock, x)
class AllMine:
def __init__(self, xs, lock, x):
self.xs = xs
self.lock = lock
self.x = x
def __enter__(self):
x = self.x
with self.lock:
xlock = self.xs.get(x)
if xlock is None:
xlock = threading.Lock()
xlock.acquire()
count = 0
else:
xlock, count = xlock
self.xs[x] = xlock, count + 1
if count: # x was already known - wait for it
xlock.acquire()
assert xlock.locked
def __exit__(self, *args):
x = self.x
with self.lock:
xlock, count = self.xs[x]
assert xlock.locked
assert count > 0
count -= 1
if count:
self.xs[x] = xlock, count
else:
del self.xs[x]
xlock.release()
Run Code Online (Sandbox Code Playgroud)
那么哪种方式更好?使用条件;-)这种方式"几乎显然是正确的",但是每个参数的锁定(LPA)方法有点令人头疼.LPA方法确实具有以下优点:当线程完成时x,允许继续执行的唯一线程是那些想要使用它的线程x; 使用条件,.notify_all()唤醒所有线程阻塞等待任何参数.但是除非尝试使用相同参数的线程之间存在非常激烈的争用,否则这并不重要:使用条件,唤醒的线程不等待x保持清醒只有足够长的时间才能看到这x in fargs是真的,然后立即阻止(.wait()).
| 归档时间: |
|
| 查看次数: |
190 次 |
| 最近记录: |