use*_*820 6 python multithreading gil spacy
参考这篇文章 多线程NLP与Spacy管道谈论,
from spacy.attrs import *
# All strings mapped to integers, for easy export to numpy
np_array = doc.to_array([LOWER, POS, ENT_TYPE, IS_ALPHA])
from reddit_corpus import RedditComments
reddit = RedditComments('/path/to/reddit/corpus')
# Parse a stream of documents, with multi-threading (no GIL!)
# Processes over 100,000 tokens per second.
for doc in nlp.pipe(reddit.texts, batch_size=10000, n_threads=4):
# Multi-word expressions, such as names, dates etc
# can be merged into single tokens
for ent in doc.ents:
ent.merge(ent.root.tag_, ent.text, ent.ent_type_)
# Efficient, lossless serialization --- all annotations
# saved, same size as uncompressed text
byte_string = doc.to_bytes()
Run Code Online (Sandbox Code Playgroud)
syl*_*sm_ 13
我需要在此上写一篇合适的博文.tl; dr是spaCy是用Cython实现的,Cython是一种类似Python的语言,可以转换成C或C++,最终产生Python扩展.您可以在此处阅读有关使用Cython发布GIL的更多信息:
http://docs.cython.org/src/userguide/parallelism.html
这是spaCy中.pipe方法的实现:
https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/parser.pyx#L135
def pipe(self, stream, int batch_size=1000, int n_threads=2):
cdef Pool mem = Pool()
cdef TokenC** doc_ptr = <TokenC**>mem.alloc(batch_size, sizeof(TokenC*))
cdef int* lengths = <int*>mem.alloc(batch_size, sizeof(int))
cdef Doc doc
cdef int i
cdef int nr_class = self.moves.n_moves
cdef int nr_feat = self.model.nr_feat
cdef int status
queue = []
for doc in stream:
doc_ptr[len(queue)] = doc.c
lengths[len(queue)] = doc.length
queue.append(doc)
if len(queue) == batch_size:
with nogil:
for i in cython.parallel.prange(batch_size, num_threads=n_threads):
status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
if status != 0:
with gil:
sent_str = queue[i].text
raise ValueError("Error parsing doc: %s" % sent_str)
PyErr_CheckSignals()
for doc in queue:
self.moves.finalize_doc(doc)
yield doc
queue = []
batch_size = len(queue)
with nogil:
for i in cython.parallel.prange(batch_size, num_threads=n_threads):
status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
if status != 0:
with gil:
sent_str = queue[i].text
raise ValueError("Error parsing doc: %s" % sent_str)
PyErr_CheckSignals()
for doc in queue:
self.moves.finalize_doc(doc)
yield doc
Run Code Online (Sandbox Code Playgroud)
多线程的实际机制非常简单,因为NLP(通常)令人尴尬地并行 - 每个文档都是独立解析的,所以我们只需要在文本流上进行一个prange循环.
但是,以多线程方式实现解析器非常困难.要有效地使用多线程,您需要释放GIL,而不是重新获取它.这意味着不使用Python对象,不会引发异常等.
当你创建一个Python对象---让我们说一个列表---你需要增加它的引用计数,它是全局存储的.这意味着获得GIL.没有办法解决这个问题.但是如果你在C扩展中并且你只想在堆栈上放一个整数,或者调用malloc或free,你就不需要获取GIL了.因此,如果您在该级别编写程序,仅使用C和C++构造,则可以释放GIL.
我已经在Cython中编写了统计解析器几年了.(在spaCy之前,我有一个实验用于我的学术研究.)在没有GIL的情况下编写整个解析循环很难.到2015年底,我将机器学习,哈希表,外部解析循环和大多数特征提取作为nogil代码.但是状态对象具有复杂的接口,并且被实现为cdef类.我无法创建此对象或将其存储在容器中而不获取GIL.
当我想出一种在Cython中编写C++类的无证方法时,突破就来了.这让我可以挖空控制解析器状态的现有cdef类.我通过方法将其接口代理到内部C++类.这样我可以保持代码正常工作,并确保我没有在功能计算中引入任何微妙的错误.
你可以在这里看到内部类:https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/_state.pxd
如果您浏览此文件的git历史记录,您可以看到我实现.pipe方法的补丁.
| 归档时间: |
|
| 查看次数: |
1543 次 |
| 最近记录: |