我想要一个长时间运行的进程来返回它在队列(或类似的东西)上的进度,我将把它提供给进度条对话框.完成该过程后,我还需要结果.这里的测试示例失败了RuntimeError: Queue objects should only be shared between processes through inheritance.
import multiprocessing, time
def task(args):
count = args[0]
queue = args[1]
for i in xrange(count):
queue.put("%d mississippi" % i)
return "Done"
def main():
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
result = pool.map_async(task, [(x, q) for x in range(10)])
time.sleep(1)
while not q.empty():
print q.get()
print result.get()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
我已经能够得到这个使用单个进程对象的工作(在这里我很 alowed传递一个队列引用),但是我没有一个池来管理许多流程我要启动.有关更好的模式的建议吗?
我有一个模糊的字符串匹配脚本,在400万公司名称的大海捞针中寻找大约30K针.虽然脚本工作正常,但我在AWS h1.xlarge上通过并行处理加速处理的尝试失败了,因为我的内存不足.
我不想试图获得更多内存,而是根据我之前的问题解释,我想找出如何优化工作流程 - 我对此很新,所以应该有足够的空间.顺便说一句,我已经尝试过排队(虽然工作但是遇到了同样的问题MemoryError,再看看一堆非常有用的SO贡献,但还没有完成.)
这是与代码最相关的内容.我希望它足以澄清逻辑 - 很高兴根据需要提供更多信息:
def getHayStack():
## loads a few million company names into id: name dict
return hayCompanies
def getNeedles(*args):
## loads subset of 30K companies into id: name dict (for allocation to workers)
return needleCompanies
def findNeedle(needle, haystack):
""" Identify best match and return results with score """
results = {}
for hayID, hayCompany in haystack.iteritems():
if not isnull(haystack[hayID]):
results[hayID] = levi.setratio(needle.split(' '),
hayCompany.split(' '))
scores = list(results.values()) …Run Code Online (Sandbox Code Playgroud) python performance memory-management multiprocessing string-matching