谁在使用多处理池的apply_async方法时运行回调?

Ale*_*lex 34 python parallel-processing callback multiprocessing

我正在尝试了解使用多处理池的apply_sync方法时幕后发生的一些事情.

谁运行回调方法?它是调用apply_async的主要进程吗?

假设我发送了一大堆带回调的apply_async命令,然后继续我的程序.当apply_async开始完成时,我的程序仍在执行操作.当主进程仍然忙于脚本时,回调是如何运行我的"主进程"的?

这是一个例子.

import multiprocessing
import time

def callback(x):
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)

def func(x):
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
    return x

pool = multiprocessing.Pool()

args = range(20)

for a in args:
    pool.apply_async(func, (a,), callback=callback)

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)

t0 = time.time()
while time.time() - t0 < 60:
    pass

print 'Finished with the script'
Run Code Online (Sandbox Code Playgroud)

输出就像是

使用arg 0运行func的PoolWorker-1

PoolWorker-2使用arg 1运行func

PoolWorker-3使用arg 2运行func

MainProcess进入休眠状态< - 主进程正忙

PoolWorker-4使用arg 3运行func

使用arg 4运行func的PoolWorker-1

PoolWorker-2使用arg 5运行func

PoolWorker-3使用arg 6运行func

PoolWorker-4使用arg 7运行func

MainProcess运行回调与arg 0 < - 主进程运行回调,而它仍然在while循环!

MainProcess运行带有arg 1的回调

MainProcess使用arg 2运行回调

MainProcess使用arg 3运行回调

MainProcess使用arg 4运行回调

使用arg 8运行func的PoolWorker-1

...

用脚本完成

MainProcess如何在while循环的中间运行回调?

关于multiprocessing.Pool的文档中有关于回调的声明似乎是一个提示,但我不明白.

apply_async(func [,args [,kwds [,callback]]])

apply()方法的一种变体,它返回一个结果对象.

如果指定了回调,则它应该是可调用的,它接受单个参数.当结果变为就绪时,将对其应用回调(除非调用失败).回调应该立即完成,否则处理结果的线程将被阻止.

dan*_*ano 32

文档中确实有一个暗示:

回调应该立即完成,否则处理结果的线程将被阻止.

回调在主进程中处理,但它们在自己的单独线程中运行.创建Pool它时,它实际上在Thread内部创建了一些对象:

class Pool(object):
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        ... # stuff we don't care about
        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN 
        self._worker_handler.start()

        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN 
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()
Run Code Online (Sandbox Code Playgroud)

对我们来说有趣的是_result_handler:我们很快就会明白为什么.

切换齿轮一秒钟,当你运行时apply_async,它会在ApplyResult内部创建一个对象来管理从子进程中获取结果:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self


    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,假设任务成功,该_set方法最终实际执行callback传入的方法.还要注意它cache在最后添加到全局字典__init__.

现在,回到_result_handler线程对象.该对象调用该_handle_results函数,如下所示:

    while 1:
        try:
            task = get()
        except (IOError, EOFError):
            debug('result handler got EOFError/IOError -- exiting')
            return

        if thread._state:
            assert thread._state == TERMINATE
            debug('result handler found thread._state=TERMINATE')
            break

        if task is None:
            debug('result handler got sentinel')
            break

        job, i, obj = task
        try:
            cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
        except KeyError:
            pass

        # More stuff
Run Code Online (Sandbox Code Playgroud)

这是一个循环,只是从子队列中取出结果,找到它的条目cache,并调用_set,执行我们的回调.即使你处于循环中它也可以运行,因为它没有在主线程中运行.

  • @Alex是的,回调将按顺序执行.`_result_handler`线程从队列中拉出一个完成的任务,调用`_set`(运行回调),然后继续下一个.这就是为什么文档说要确保回调立即完成的原因; 执行回调会阻止处理其他结果. (6认同)
  • 感谢达诺花时间写下如此详细的回复!如果我理解正确的话,池会创建一个*单个*新线程(result_handler),其工作就是等待 apply_async 完成,然后在 result_handler 的线程(它是 MainProcess 的一部分)中调用回调。回调(对于单个池对象)是否会按顺序调用?即,一堆 apply_async 可能会一起完成,但回调将由 result_handler 一个接一个地串行运行? (3认同)
  • 还有一个问题.如果回调函数和主脚本都混淆了相同的对象(在MainProcess中)怎么办?会有不可预测的行为吗?即如果回调和主脚本中的某些内容都尝试写入同一文件或修改相同的数组.当回调实际运行时,谁知道主脚本将在那时做什么. (3认同)
  • @Alex你肯定需要担心你在回调中改变的任何对象的线程安全性.一般来说,我建议在回调中尽可能少地做,但如果你绝对需要触摸共享状态,你必须使用某种互斥锁来保护它. (3认同)