multiprocessing.Pool:使用apply_async的回调选项时调用辅助函数

eha*_*nom 4 python multiprocessing

apply_async调用iterable(?)函数和回调函数之间的工作流程如何?

设置:我正在读取2000文件目录中的所有文件的一些行,一些有数百万行,有些只有少数几行.提取一些标题/格式/日期数据以对每个文件进行特征化.这是在16 CPU机器上完成的,因此对它进行多处理是有意义的.

目前,预期的结果被发送到列表(ahlala),所以我可以打印出来; 之后,这将被写入*.csv.这是我的代码的简化版本,最初基于这个非常有用的帖子.

import multiprocessing as mp

def dirwalker(directory):
  ahlala = []

  # X() reads files and grabs lines, calls helper function to calculate
  # info, and returns stuff to the callback function
  def X(f): 
    fileinfo = Z(arr_of_lines) 
    return fileinfo 

  # Y() reads other types of files and does the same thing
  def Y(f): 
    fileinfo = Z(arr_of_lines)
    return fileinfo

  # results() is the callback function
  def results(r):
    ahlala.extend(r) # or .append, haven't yet decided

  # helper function
  def Z(arr):
    return fileinfo # to X() or Y()!

  for _,_,files in os.walk(directory):
    pool = mp.Pool(mp.cpu_count()
    for f in files:
      if (filetype(f) == filetypeX): 
        pool.apply_async(X, args=(f,), callback=results)
      elif (filetype(f) == filetypeY): 
        pool.apply_async(Y, args=(f,), callback=results)

  pool.close(); pool.join()
  return ahlala
Run Code Online (Sandbox Code Playgroud)

注意,代码工作,如果我把所有Z()的辅助函数,变成要么X(),Y()results(),但是这两种重复或可能比可能慢?我知道每个函数调用都会调用回调函数,但是什么时候调用了回调函数?它是否pool.apply_async()......完成了流程的所有工作?如果在第一个函数的范围(?)内调用这些辅助函数pool.apply_async()(在这种情况下),它不应该更快X()吗?如果没有,我应该把助手功能放进去results()吗?

其他相关的想法:守护进程是否为什么没有出现?我也很困惑如何排队,如果这是问题.这似乎是一个开始学习它的地方,但是在使用时可以安全地忽略排队apply_async,或者只是在显着的时间效率低下?

dan*_*ano 7

你在这里询问了很多不同的东西,所以我会尽力覆盖它:

callback一旦工作进程返回其结果,您传递给的函数将在主进程(而不是worker)中执行.它在Pool对象在内部创建的线程中执行.该线程result_queue使用a中的对象,该对象用于从所有工作进程获取结果.线程将结果从队列中拉出后,执行callback.当您的回调正在执行时,不能从队列中提取其他结果,因此回调快速完成非常重要.在您的示例中,只要其中一个调用XYapply_async完成的调用完成,结果将被放置到result_queue工作进程中,然后结果处理线程将结果从中拉出result_queue,并且您callback将执行.

其次,我怀疑你没有看到你的示例代码发生任何事情的原因是因为所有的工作函数调用都失败了.如果worker函数失败,callback则永远不会执行.除非您尝试从AsyncResult调用返回的对象中获取结果,否则根本不会报告失败apply_async.但是,由于您没有保存任何这些对象,因此您永远不会知道发生的故障.如果我是你,我会pool.apply在你测试时尝试使用,这样你就会在发生错误时立即看到错误.

工作者可能失败的原因(至少在您提供的示例代码中)是因为X并且Y被定义为另一个函数内的函数.multiprocessing通过在主进程中对它们进行pickle将函数和对象传递给工作进程,并在工作进程中对它们进行unpickling.在其他函数中定义的函数不可选,这意味着multiprocessing无法在工作进程中成功取消它们.要解决此问题,请在模块的顶层定义两个函数,而不是嵌入dirwalker函数.

你绝对应该继续打电话Z,XY不是打电话results.这样,Z可以在所有工作进程中并发运行,而不必在主进程中一次运行一个调用.请记住,您的callback功能应该尽可能快,因此您不会保留处理结果.Z在那里执行会减慢速度.

这里有一些简单的示例代码,与您正在执行的操作类似,希望能让您了解代码的外观:

import multiprocessing as mp
import os

# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f): 
    fileinfo = Z(f) 
    return fileinfo 

# Y() reads other types of files and does the same thing
def Y(f): 
    fileinfo = Z(f)
    return fileinfo

# helper function
def Z(arr):
    return arr + "zzz"

def dirwalker(directory):
    ahlala = []

    # results() is the callback function
    def results(r):
        ahlala.append(r) # or .append, haven't yet decided

    for _,_,files in os.walk(directory):
        pool = mp.Pool(mp.cpu_count())
        for f in files:
            if len(f) > 5: # Just an arbitrary thing to split up the list with
                pool.apply_async(X, args=(f,), callback=results)  # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :(
            else:
                pool.apply_async(Y, args=(f,), callback=results)

    pool.close()
    pool.join()
    return ahlala


if __name__ == "__main__":
    print(dirwalker("/usr/bin"))
Run Code Online (Sandbox Code Playgroud)

输出:

['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ]
Run Code Online (Sandbox Code Playgroud)

编辑:

您可以使用multiprocessing.Manager该类创建在父进程和子进程之间共享的dict对象:

pool = mp.Pool(mp.cpu_count())
m = multiprocessing.Manager()
helper_dict = m.dict()
for f in files:
    if len(f) > 5:
        pool.apply_async(X, args=(f, helper_dict), callback=results)
    else:
        pool.apply_async(Y, args=(f, helper_dict), callback=results)
Run Code Online (Sandbox Code Playgroud)

然后制作XY接受一个名为helper_dict(或任何你想要的名字)的第二个参数,然后你就完成了.

需要注意的是,这可以通过创建包含普通dict的服务器进程来工作,并且所有其他进程通过Proxy对象与该dict进行通信.因此,每当您阅读或写入字典时,您都在进行IPC.这使它比真正的字典慢很多.