sbr*_*her 3 python locking multiprocessing
我正在构建一个python模块,以从大量文本集中提取标签,尽管其结果是高质量的,但执行速度非常慢。我正在尝试通过使用多处理来加快进程,并且也一直有效,直到尝试引入锁,以便一次只有一个进程连接到我们的数据库。我一生都不知道如何进行这项工作-尽管进行了大量的搜索和调整,但我仍然得到了帮助PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed。这是令人讨厌的代码-在我尝试传递锁对象作为的参数之前,它工作得很好f。
def make_network(initial_tag, max_tags = 2, max_iter = 3):
manager = Manager()
lock = manager.Lock()
pool = manager.Pool(8)
# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a list
# of strings (tags) as its sole argument, and returns a list of sets with entries
# corresponding to the input list.
f = partial(get_more_tags, max_tags = max_tags, lock = lock)
def _recursively_find_more_tags(tags, level):
if level >= max_iter:
raise StopIteration
new_tags = pool.map(f, tags)
to_search = []
for i, s in zip(tags, new_tags):
for t in s:
joined = ' '.join(t)
print i + "|" + joined
to_search.append(joined)
try:
return _recursively_find_more_tags(to_search, level+1)
except StopIteration:
return None
_recursively_find_more_tags([initial_tag], 0)
Run Code Online (Sandbox Code Playgroud)
您的问题是锁对象不可腌制。在这种情况下,我可以为您找到两种可能的解决方案。
为避免这种情况,可以将锁变量设为全局变量。然后,您将能够在池处理函数中直接将其作为全局变量进行引用,而不必将其作为参数传递给池处理函数。之所以可行,是因为Python OS fork在创建池进程时使用了该机制,因此将创建池进程的进程的所有内容复制到它们。这是将锁传递给使用多处理程序包创建的Python进程的唯一方法。顺便说一句,不必Manager仅将类用于此锁定。进行此更改后,您的代码将如下所示:
import multiprocessing
from functools import partial
lock = None # Global definition of lock
pool = None # Global definition of pool
def make_network(initial_tag, max_tags=2, max_iter=3):
global lock
global pool
lock = multiprocessing.Lock()
pool = multiprocessing.Pool(8)
def get_more_tags():
global lock
pass
# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a
# list of strings (tags) as its sole argument, and returns a list of sets
# with entries corresponding to the input list.
f = partial(get_more_tags, max_tags=max_tags)
def _recursively_find_more_tags(tags, level):
global pool
if level >= max_iter:
raise StopIteration
new_tags = pool.map(f, tags)
to_search = []
for i, s in zip(tags, new_tags):
for t in s:
joined = ' '.join(t)
print(i + "|" + joined)
to_search.append(joined)
try:
return _recursively_find_more_tags(to_search, level + 1)
except StopIteration:
return None
_recursively_find_more_tags([initial_tag], 0)
Run Code Online (Sandbox Code Playgroud)在您的实际代码中,lock和pool变量可能是类实例变量。
multiprocessing.Process并将其通过a连接multiprocessing.Queue到您的每个池进程。此过程将负责运行数据库查询。您将使用队列来允许您的池进程将参数发送到管理数据库查询的进程。由于所有池进程将使用同一队列,因此对数据库的访问将自动进行序列化。额外的开销将来自数据库查询参数的酸洗/酸洗和查询响应。请注意,您可以将multiprocessing.Queue对象作为参数传递给池进程。还要注意,multiprocessing.Lock基于解决方案的方法不适Windows用于没有使用fork 语义。