dan*_*n19 11 python thread-safety tee
假设我有这个Python代码:
from itertools import count, tee
original = count() # just an example, can be another iterable
a, b = tee(original)
Run Code Online (Sandbox Code Playgroud)
问题是,如果我开始在一个线程中迭代"a"并且同时在另一个线程中迭代"b",会有任何问题吗?显然,a和b共享一些数据(原始可迭代,+一些额外的东西,内部缓冲区或其他东西).那么,a.next()和b.next()在访问这个共享数据时会进行适当的锁定吗?
Dro*_*ser 11
在CPython中,当且仅当原始迭代器在C/C++中实现时itertools.tee才是线程安全的,即不使用任何 python.
如果原始迭代器it是用python编写的,就像类实例或生成器一样,那么itertools.tee(it)它不是线程安全的.在最好的情况下,你只会得到一个异常(你将会),并且在最糟糕的python中会崩溃.
而不是使用tee,这是一个线程安全的包装类和函数:
class safeteeobject(object):
"""tee object wrapped to make it thread-safe"""
def __init__(self, teeobj, lock):
self.teeobj = teeobj
self.lock = lock
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.teeobj)
def __copy__(self):
return safeteeobject(self.teeobj.__copy__(), self.lock)
def safetee(iterable, n=2):
"""tuple of n independent thread-safe iterators"""
lock = Lock()
return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))
Run Code Online (Sandbox Code Playgroud)
我现在将扩展(很多)on teeis is is is and thread-safe,以及为什么.
让我们运行一些代码(这是python 3代码,用于python 2 itertools.izip而不是zip具有相同的行为):
>>> from itertools import tee, count
>>> from threading import Thread
>>> def limited_sum(it):
... s = 0
... for elem, _ in zip(it, range(1000000)):
... s += elem
... print(elem)
>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999
Run Code Online (Sandbox Code Playgroud)
itertools.count完全用C++编写Modules/itertoolsmodule.c,在CPython项目的文件中,所以它工作得很好.
对于列表,元组,集合,范围,字典(键,值和项),collections.defaultdict(键,值和项)以及其他一些情况也是如此.
一个非常简短的例子是使用生成器:
>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
ValueError: generator already executing
Run Code Online (Sandbox Code Playgroud)
是的,tee是用C++编写的,GIL确实一次执行一个字节的代码.但上面的例子表明,这还不足以确保线程安全.在某个地方,这就是发生的事情:
next了他们的tee_object实例相同的次数,next(a),next(gen),gen是用python编写的.比方说,gen.__next__CPython 的第一个字节代码决定切换线程,next(b),next(gen)gen.__next__已经在线程1中运行,我们得到一个例外.好吧,也许在内部使用发电机不是线程安全的tee.然后我们运行上面使用迭代器对象的代码的变体:
>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... def __iter__(self):
... return self
... def __next__(self):
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
Run Code Online (Sandbox Code Playgroud)
上面的代码在Ubuntu,Windows 7和OSX上的python 2.7.13和3.6(可能还有所有cpython版本)中崩溃了.我不想透露原因,再说一步.
也许上面的代码崩溃了,因为我们的迭代器本身不是线程安全的.让我们添加一个锁,看看会发生什么:
>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... self.lock = Lock()
... def __iter__(self):
... return self
... def __next__(self):
... with self.lock:
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
Run Code Online (Sandbox Code Playgroud)
在迭代器中添加一个锁是不足以使tee线程安全的.
问题的关键是CPython 文件中的getitem方法.实现非常酷,优化可以节省RAM调用:返回"tee对象",每个对象都保存对头部的引用.这些反过来就像链接列表中的链接,但不是持有单个元素 - 它们保持57.这对我们的目的并不重要,但它就是它的本质.这是以下功能:teedataobjectModules/itertoolsmodule.cteeteeteedataobjectgetitemteedataobject
static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
PyObject *value;
assert(i < LINKCELLS);
if (i < tdo->numread)
value = tdo->values[i];
else {
/* this is the lead iterator, so fetch more data */
assert(i == tdo->numread);
value = PyIter_Next(tdo->it);
if (value == NULL)
return NULL;
tdo->numread++;
tdo->values[i] = value;
}
Py_INCREF(value);
return value;
}
Run Code Online (Sandbox Code Playgroud)
当被要求提供元素时,teedataobject检查它是否已准备好.如果是,则返回它.如果没有,则调用next原始迭代器.这是,如果迭代器是用python编写的,代码可以挂起.所以这就是问题所在:
next的次数相同,next(a),C代码进入PyIter_Next上面的调用.比方说,next(gen)CPython 的第一个字节代码决定切换线程.next(b),因为它仍然需要一个新元素,C代码进入PyIter_Next调用,此时,两个线程都在同一个地方,与相同的价值观i和tdo->numread.注意,这tdo->numread只是一个变量,用于跟踪teedataobject应该写入下一个的57个单元链接中的哪个位置.
PyIter_Next并返回一个元素.在某些时候,CPython决定再次切换线程,线程1恢复,完成其调用PyIter_Next,然后运行两行:
tdo->numread++;
tdo->values[i] = value;
Run Code Online (Sandbox Code Playgroud)但是线程2已经设置好了tdo->values[i]!
这已经足以表明它tee不是线程安全的,因为我们丢失了线程2放入的值tdo->values[i].但这并不能解释崩溃的原因.
Say i是56.既然两个线程都调用了tdo->numread++,它现在达到58-高于57,分配的大小为tdo->values.在线程1继续移动之后,该对象tdo不再有引用并准备删除.这是明确的功能teedataobject:
static int
teedataobject_clear(teedataobject *tdo)
{
int i;
PyObject *tmp;
Py_CLEAR(tdo->it);
for (i=0 ; i<tdo->numread ; i++)
Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
tmp = tdo->nextlink;
tdo->nextlink = NULL;
teedataobject_safe_decref(tmp);
return 0;
}
Run Code Online (Sandbox Code Playgroud)
在标有"问题"的行上,CPython将尝试清除tdo->values[57].这是崩溃发生的地方.好吧,有些时候.崩溃的地方不止一个,我只想展示一个.
现在你知道了 - itertools.tee不是线程安全的.
__next__我们可以锁定,而不是锁定迭代器内部tee.__next__.这意味着teedataobject.__getitem__每次都会由单个线程调用整个方法.我在这个答案的开头做了一个简短的实现.它是替代品,tee它是线程安全的.它唯一没有实现的tee是 - 酸洗.由于锁不可选,因此添加它并非易事.但是,当然,它可以做到.
在C-Python中,itertools.tee()它返回的迭代器是用C代码实现的。这意味着 GIL 应该保护它不被多个线程同时调用。它可能会正常工作,并且不会使解释器崩溃,但不能保证线程安全。
简而言之,不要冒险。