AJS*_*yth 3 python set multiprocessing
我在Python 2.7中使用多处理来处理非常大的数据集.当每个进程运行时,它会将整数添加到共享的mp.Manager.Queue(),但前提是其他进程尚未添加相同的整数.由于你不能对队列进行"in"式成员资格测试,我这样做的方法是检查每个int是否为共享mp.Manager.list()中的成员资格.该列表最终将有大约3000万个条目,因此成员资格测试将非常缓慢,从而使多处理的优势无效.
这是我正在做的简化版本:
import multiprocessing as mp
def worker(shared_list, out_q, lock):
# Do some processing and get an integer
result_int = some_other_code()
# Use a lock to ensure nothing is added to the list in the meantime
lock.acquire()
# This lookup can take forever when the list is large
if result_int not in shared_list:
out_q.put(result_int)
shared_list.append(result_int)
lock.release()
manager = mp.Manager()
shared_list = manager.list()
lock = manager.lock()
out_q = manager.Queue()
for i in range(8):
p = mp.Process(target=worker, args=(shared_list, out_q, lock))
p.start()
Run Code Online (Sandbox Code Playgroud)
我之前尝试使用set()而不是mp.Manager.list(),但似乎每个进程都有自己的内存空间,因此当我更新集合时,它不会跨进程同步.因此,我改用现在的方法.
以下是我之前尝试使用set()的方法:将多处理导入为mp
def worker(shared_set, out_q, lock):
# Do some processing and get an integer
result_int = some_other_code()
# Use a lock to ensure nothing is added to the set in the meantime
lock.acquire()
# This lookup is fast, but the set doesn't reflect additions made by other processes.
if result_int not in shared_set:
out_q.put(result_int)
shared_set.add(result_int)
lock.release()
manager = mp.Manager()
lock = manager.lock()
out_q = manager.Queue()
# This set will NOT synchronize between processes
shared_set = set()
for i in range(8):
p = mp.Process(target=worker, args=(shared_set, out_q, lock))
p.start()
Run Code Online (Sandbox Code Playgroud)
注意:这些示例未经测试,只是代表我的代码的相关部分.
有没有办法跨进程共享集合,或者更快地进行成员资格查找?
编辑:更多信息:out_q由另一个将数据写入单个输出文件的进程使用.没有重复.如果我生成一个整数并且发现它是重复的,那么该过程需要返回并生成下一个最佳整数.
一个明显的调整是使用mp.Manager.dict()而不是集合,并使用任意值(例如,设置the_dict[result_int] = 1以指示集合中的成员资格).顺便说一下,这就是"每个人"在Python添加set类型之前实现集合的方式,甚至现在的dicts和sets都是由基本相同的代码实现的.
后来添加:我承认我不明白为什么你在原始代码中同时使用了一个集合和一个列表,因为集合的密钥与列表的内容相同.如果输入顺序不重要,为什么不完全忘记列表?然后,您还可以删除原始所需的锁定层,以使设置和列表保持同步.
充实,用dict建议,整个功能将变得像:
def worker(shared_dict):
# Do some processing and get an integer
result_int = some_other_code()
shared_dict[result_int] = 1
Run Code Online (Sandbox Code Playgroud)
其他进程可以shared_dict.pop()一次获得一个值(尽管如此,不,他们不能.pop()像对队列一样等待.get()).
还有一个:考虑使用本地(进程本地)集合?他们跑得快得多.然后每个工作人员不会添加它知道的任何重复项,但跨进程可能存在重复项.你的代码没有给出关于out_q消费者做什么的任何提示,但如果只有一个,那么一个本地集也可以清除跨进程重复.或者也许内存负担太高了?不能从这里猜测;-)
我将建议一种不同的方法:根本不使用mp.Manager.大多数时候,我看到人们使用它,他们后悔,因为它没有做他们认为正在做的事情.他们的想法是:它提供物理共享的对象.它正在做什么:它提供语义共享的对象.在物理上,它们生活在另一个中,对象的覆盖,过程和操作被转发到后一个过程,在那里它们由该过程在它自己的地址空间中执行.它根本没有物理共享.因此,尽管它非常方便,但即使是最简单的操作也会有大量的进程间开销.
因此,我建议在一个过程中使用单个普通集合,这将是删除重复项的唯一代码.工作流程生成整数而不关心重复 - 它们只是通过了整数.一个mp.Queue是好的(再次,没有真正的需要mp.Manager.Queue).
像这样,这是一个完整的可执行程序:
N = 20
def worker(outq):
from random import randrange
from time import sleep
while True:
i = randrange(N)
outq.put(i)
sleep(0.1)
def uniqueifier(inq, outq):
seen = set()
while True:
i = inq.get()
if i not in seen:
seen.add(i)
outq.put(i)
def consumer(inq):
for _ in range(N):
i = inq.get()
print(i)
if __name__ == "__main__":
import multiprocessing as mp
q1 = mp.Queue()
q2 = mp.Queue()
consume = mp.Process(target=consumer, args=(q2,))
consume.start()
procs = [mp.Process(target=uniqueifier, args=(q1, q2))]
for _ in range(4):
procs.append(mp.Process(target=worker, args=(q1,)))
for p in procs:
p.start()
consume.join()
for p in procs:
p.terminate()
Run Code Online (Sandbox Code Playgroud)
传递的第二个队列uniqueifier扮演原始队列的角色:它只传递唯一的整数.没有尝试"共享记忆",因此没有支付任何费用.唯一的进程间通信是通过简单明确的mp.Queue操作.只有一组,因为它没有以任何方式共享它尽可能快地运行.
实际上,这只是设置一个简单的管道,尽管有多个输入.