是itertools.tee()线程安全(Python)的结果

dan*_*n19 11 python thread-safety tee

假设我有这个Python代码:

from itertools import count, tee
original = count()     # just an example, can be another iterable
a, b = tee(original)
Run Code Online (Sandbox Code Playgroud)

问题是,如果我开始在一个线程中迭代"a"并且同时在另一个线程中迭代"b",会有任何问题吗?显然,a和b共享一些数据(原始可迭代,+一些额外的东西,内部缓冲区或其他东西).那么,a.next()和b.next()在访问这个共享数据时会进行适当的锁定吗?

Dro*_*ser 11

TL;博士

在CPython中,当且仅当原始迭代器在C/C++中实现时itertools.tee才是线​​程安全的,即不使用任何 python.

如果原始迭代器it是用python编写的,就像类实例或生成器一样,那么itertools.tee(it)不是线程安全的.在最好的情况下,你只会得到一个异常(你将会),并且在最糟糕的python中会崩溃.

而不是使用tee,这是一个线程安全的包装类和函数:

class safeteeobject(object):
    """tee object wrapped to make it thread-safe"""
    def __init__(self, teeobj, lock):
        self.teeobj = teeobj
        self.lock = lock
    def __iter__(self):
        return self
    def __next__(self):
        with self.lock:
            return next(self.teeobj)
    def __copy__(self):
        return safeteeobject(self.teeobj.__copy__(), self.lock)

def safetee(iterable, n=2):
    """tuple of n independent thread-safe iterators"""
    lock = Lock()
    return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))
Run Code Online (Sandbox Code Playgroud)

我现在将扩展(很多)on teeis is is is and thread-safe,以及为什么.

可以的例子

让我们运行一些代码(这是python 3代码,用于python 2 itertools.izip而不是zip具有相同的行为):

>>> from itertools import tee, count
>>> from threading import Thread

>>> def limited_sum(it):
...     s = 0
...     for elem, _ in zip(it, range(1000000)):
...         s += elem
...     print(elem)

>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999
Run Code Online (Sandbox Code Playgroud)

itertools.count完全用C++编写Modules/itertoolsmodule.c,在CPython项目的文件中,所以它工作得很好.

对于列表,元组,集合,范围,字典(键,值和项),collections.defaultdict(键,值和项)以及其他一些情况也是如此.

它不起作用的示例 - 生成器

一个非常简短的例子是使用生成器:

>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]

Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
ValueError: generator already executing
Run Code Online (Sandbox Code Playgroud)

是的,tee是用C++编写的,GIL确实一次执行一个字节的代码.但上面的例子表明,这还不足以确保线程安全.在某个地方,这就是发生的事情:

  1. 两个线程调用next了他们的tee_object实例相同的次数,
  2. 线程1调用next(a),
  3. 它需要获得一个新元素,所以线程1现在调用next(gen),
  4. gen是用python编写的.比方说,gen.__next__CPython 的第一个字节代码决定切换线程,
  5. 线程2恢复并调用next(b),
  6. 它需要获得一个新元素,因此它会调用 next(gen)
  7. 由于gen.__next__已经在线程1中运行,我们得到一个例外.

它不起作用的示例 - 迭代器对象

好吧,也许在内部使用发电机不是线程安全的tee.然后我们运行上面使用迭代器对象的代码的变体:

>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...     def __iter__(self):
...         return self
...     def __next__(self):
...         self.i -= 1
...         if self.i < 0:
...             raise StopIteration
...         return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
Run Code Online (Sandbox Code Playgroud)

上面的代码在Ubuntu,Windows 7和OSX上的python 2.7.13和3.6(可能还有所有cpython版本)中崩溃了.我不想透露原因,再说一步.

如果我在迭代器中使用锁怎么办?

也许上面的代码崩溃了,因为我们的迭代器本身不是线程安全的.让我们添加一个锁,看看会发生什么:

>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...         self.lock = Lock()
...     def __iter__(self):
...         return self
...     def __next__(self):
...         with self.lock:
...             self.i -= 1
...             if self.i < 0:
...                 raise StopIteration
...             return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
Run Code Online (Sandbox Code Playgroud)

在迭代器中添加一个锁是不足以使tee线程安全的.

为什么tee不是线程安全的

问题的关键是CPython 文件中的getitem方法.实现非常酷,优化可以节省RAM调用:返回"tee对象",每个对象都保存对头部的引用.这些反过来就像链接列表中的链接,但不是持有单个元素 - 它们保持57.这对我们的目的并不重要,但它就是它的本质.这是以下功能:teedataobjectModules/itertoolsmodule.cteeteeteedataobjectgetitemteedataobject

static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
    PyObject *value;

    assert(i < LINKCELLS);
    if (i < tdo->numread)
        value = tdo->values[i];
    else {
        /* this is the lead iterator, so fetch more data */
        assert(i == tdo->numread);
        value = PyIter_Next(tdo->it);
        if (value == NULL)
            return NULL;
        tdo->numread++;
        tdo->values[i] = value;
    }
    Py_INCREF(value);
    return value;
}
Run Code Online (Sandbox Code Playgroud)

当被要求提供元素时,teedataobject检查它是否已准备好.如果是,则返回它.如果没有,则调用next原始迭代器.这是,如果迭代器是用python编写的,代码可以挂起.所以这就是问题所在:

  1. 两个线程调用next的次数相同,
  2. 线程1调用next(a),C代码进入PyIter_Next上面的调用.比方说,next(gen)CPython 的第一个字节代码决定切换线程.
  3. 线程2调用next(b),因为它仍然需要一个新元素,C代码进入PyIter_Next调用,

此时,两个线程都在同一个地方,与相同的价值观itdo->numread.注意,这tdo->numread只是一个变量,用于跟踪teedataobject应该写入下一个的57个单元链接中的哪个位置.

  1. 线程2完成对其的调用PyIter_Next并返回一个元素.在某些时候,CPython决定再次切换线程,
  2. 线程1恢复,完成其调用PyIter_Next,然后运行两行:

        tdo->numread++;
        tdo->values[i] = value;
    
    Run Code Online (Sandbox Code Playgroud)
  3. 但是线程2已经设置好了tdo->values[i]!

这已经足以表明它tee不是线程安全的,因为我们丢失了线程2放入的值tdo->values[i].但这并不能解释崩溃的原因.

Say i是56.既然两个线程都调用了tdo->numread++,它现在达到58-高于57,分配的大小为tdo->values.在线程1继续移动之后,该对象tdo不再有引用并准备删除.这是明确的功能teedataobject:

static int
teedataobject_clear(teedataobject *tdo)
{
    int i;
    PyObject *tmp;

    Py_CLEAR(tdo->it);
    for (i=0 ; i<tdo->numread ; i++)
        Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
    tmp = tdo->nextlink;
    tdo->nextlink = NULL;
    teedataobject_safe_decref(tmp);
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

在标有"问题"的行上,CPython将尝试清除tdo->values[57].这是崩溃发生的地方.好吧,有些时候.崩溃的地方不止一个,我只想展示一个.

现在你知道了 - itertools.tee不是线程安全的.

一种解决方案 - 外部锁定

__next__我们可以锁定,而不是锁定迭代器内部tee.__next__.这意味着teedataobject.__getitem__每次都会由单个线程调用整个方法.我在这个答案的开头做了一个简短的实现.它是替代品,tee它是线程安全的.它唯一没有实现的tee是 - 酸洗.由于锁不可选,因此添加它并非易事.但是,当然,它可以做到.

  • 伙计,我向你的这个答案鞠躬。非常出色。多谢。 (3认同)

Dun*_*can 0

在C-Python中,itertools.tee()它返回的迭代器是用C代码实现的。这意味着 GIL 应该保护它不被多个线程同时调用。它可能会正常工作,并且不会使解释器崩溃,但不能保证线程安全。

简而言之,不要冒险。