垃圾 - 一旦没有线程要求它就收集锁

Pon*_*dle 6 python multithreading garbage-collection memory-leaks thread-safety

我有一个函数永远不能同时从两个线程调用相同的值.为了实现这一点,我有一个为给定密钥defaultdict生成新的threading.Locks.因此,我的代码看起来类似于:

from collections import defaultdict
import threading

lock_dict = defaultdict(threading.Lock)
def f(x):
    with lock_dict[x]:
        print "Locked for value x"
Run Code Online (Sandbox Code Playgroud)

问题是我不知道如何在不再需要时从defaultdict 安全地删除锁.如果不这样做,我的程序会有一个内存泄漏,当f使用许多不同的x值调用时,它会变得明显.

我不能简单地del lock_dict[x]在f结束,因为在另一个线程等待锁的情况下,那么第二个线程将锁定那也没再与lock_dict [X]相关联的锁,因此两个线程可以同时最终调用f与x的相同值.

Tim*_*ers 7

我会使用不同的方法:

fcond = threading.Condition()
fargs = set()

def f(x):
    with fcond:
        while x in fargs:
            fcond.wait()
        fargs.add(x)  # this thread has exclusive rights to use `x`

    # do useful stuff with x
    # any other thread trying to call f(x) will
    # block in the .wait above()

    with fcond:
        fargs.remove(x)      # we're done with x
        fcond.notify_all()   # let blocked threads (if any) proceed
Run Code Online (Sandbox Code Playgroud)

条件有一个学习曲线,但一旦爬上它们就可以更容易地编写正确的线程安全,无竞赛代码.

线程安全的原始代码

@JimMischel在评论中询问orignal的使用是否defaultdict受到比赛的影响.好问题!

答案是 - 唉 - "你必须盯着你特定的Python实现".

假设CPython的实现:如果任何通过调用代码的defaultdict提供一个默认的调用Python代码,或C代码释放GIL(全局解释锁),然后加入2-(或更多)线程可能"同时"调用withlock_dict[x]用相同的x尚未在字典中,和:

  1. 线程1看到x不在dict中,获得锁定,然后丢失其时间片(在设置xdict 之前).
  2. 线程2看到x不在dict中,并且还获得锁定.
  3. 其中一个线程的锁定在dict中结束,但两个线程都执行f(x).

盯着3.4.0a4 +(当前的开发头)的源代码,defaultdict并且threading.Lock都是由不释放GIL的C代码实现的.我不记得早期版本是否做了或没,在不同的时间,实现全部或部分defaultdictthreading.LockPython编写的.

我建议的替代代码充满了用Python实现的东西(所有threading.Condition方法),但是在设计上是无竞争的 - 即使你使用Python的旧版本也用Python实现的集合(该集合只能在保护下访问)条件变量的锁定).

每个参数一个锁定

没有条件,这似乎要困难得多.在最初的方法中,我认为你需要保留一些想要使用的线程x,你需要一个锁来保护这些计数并保护字典.我为此提出的最好的代码是如此冗长,以至于把它放在上下文管理器中似乎是一种认可.要使用,请为每个需要它的函数创建一个参数锁定器:

farglocker = ArgLocker() # for function `f()`
Run Code Online (Sandbox Code Playgroud)

然后f()可以简单地编码身体:

def f(x):
    with farglocker(x):
        # only one thread at a time can run with argument `x`
Run Code Online (Sandbox Code Playgroud)

当然,条件方法也可以包含在上下文管理器中.这是代码:

import threading

class ArgLocker:
    def __init__(self):
        self.xs = dict() # maps x to (lock, count) pair
        self.lock = threading.Lock()

    def __call__(self, x):
        return AllMine(self.xs, self.lock, x)

class AllMine:
    def __init__(self, xs, lock, x):
        self.xs = xs
        self.lock = lock
        self.x = x

    def __enter__(self):
        x = self.x
        with self.lock:
            xlock = self.xs.get(x)
            if xlock is None:
                xlock = threading.Lock()
                xlock.acquire()
                count = 0
            else:
                xlock, count = xlock
            self.xs[x] = xlock, count + 1

        if count: # x was already known - wait for it
            xlock.acquire()
        assert xlock.locked

    def __exit__(self, *args):
        x = self.x
        with self.lock:
            xlock, count = self.xs[x]
            assert xlock.locked
            assert count > 0
            count -= 1
            if count:
                self.xs[x] = xlock, count
            else:
                del self.xs[x]
            xlock.release()
Run Code Online (Sandbox Code Playgroud)

那么哪种方式更好?使用条件;-)这种方式"几乎显然是正确的",但是每个参数的锁定(LPA)方法有点令人头疼.LPA方法确实具有以下优点:当线程完成时x,允许继续执行的唯一线程是那些想要使用它的线程x; 使用条件,.notify_all()唤醒所有线程阻塞等待任何参数.但是除非尝试使用相同参数的线程之间存在非常激烈的争用,否则这并不重要:使用条件,唤醒的线程不等待x保持清醒只有足够长的时间才能看到这x in fargs是真的,然后立即阻止(.wait()).