我有一个脚本,包括从列表中打开文件,然后对该文件中的文本执行某些操作.我正在使用python多处理和Pool来尝试并行化这个操作.脚本的抽象如下:
import os
from multiprocessing import Pool
results = []
def testFunc(files):
for file in files:
print "Working in Process #%d" % (os.getpid())
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
if __name__=="__main__":
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.apply_async(testFunc, args = (files,))
results2 = results.get()
Run Code Online (Sandbox Code Playgroud)
当我运行它时,每次迭代的进程id的打印输出都是相同的.基本上我正在尝试做的是获取输入列表的每个元素并将其分支到一个单独的进程,但似乎一个进程正在完成所有工作.
使用multiprocessing.Pool apply_async()时,代码中断会发生什么?我认为这包括例外情况,但可能还有其他因素会导致工作人员失败.
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for f in files:
pool.apply_async(workerfunct, args=(*args), callback=callbackfunct)
Run Code Online (Sandbox Code Playgroud)
正如我现在所理解的那样,进程/工作程序失败(所有其他进程继续)并且没有执行抛出错误的任何内容,即使我用try/except捕获错误也是如此.
作为一个例子,通常我除了错误并输入默认值和/或打印出错误消息,然后代码继续.如果我的回调函数涉及写入文件,则使用默认值完成.
我怀疑你没有看到你的示例代码发生任何事情的原因是因为所有的工作者函数调用都失败了.如果worker函数失败,则永远不会执行回调.除非您尝试从apply_async调用返回的AsyncResult对象中获取结果,否则根本不会报告失败.但是,由于您没有保存任何这些对象,因此您永远不会知道发生的故障.如果我是你,我会在你测试时尝试使用pool.apply,这样你就会在发生错误时立即看到错误.