在python中使用"多处理"包的最佳实践

Man*_*kat 12 python multiprocessing

我正在尝试使用multiprocessing模块python.我有以下示例代码,它在ipython笔记本中没有任何错误地执行.但我发现在后台生成了额外的python进程,每次执行代码块都在笔记本中.

import multiprocessing as mp

def f(x):
    print "Hello World ", mp.current_process()
    return 1

pool = mp.Pool(3)

data = range(0,10)
pool.map(f, data)
Run Code Online (Sandbox Code Playgroud)

然而,当我将其保存在正常的.py文件中并执行时,我遇到错误并且必须终止终端以阻止程序执行.

我通过if __name__ == '__main__':在此下创建池并使用pool.close()关闭池来纠正此问题.

我使用的时候很好奇,想知道什么最佳实践之一应该遵循multiprocessing和相关的功能,例如map,apply,apply_async等?我计划使用这个模块并行读取文件,并希望将其应用于少数ML算法以加快过程.

drs*_*oop 5

概述、架构和一些实用技巧

根据我自己(也是有限的)经验,我可以分享以下有关多处理如何工作以及如何使用它的见解。我没有发现 python.org 手册的描述性或图形性很强,所以我阅读了代码。对于每个有相同印象的人......这是我迄今为止可以弥补的:

一般良好/最佳实践提示

  • 一般实现方法:
    • 减少数据大小的测试驱动:如果崩溃或计算,您不想想知道几分钟
    • 逐步和时间分析:
      • 首先,在没有多处理的情况下实现和调试
      • 接下来,在没有多个进程的情况下实现和调试单进程,分析时间和比较开销
      • 接下来,增加流程数量和配置文件时间以识别任何 GIL 问题和等待时间。
  • 简单的Processes 或它们的列表对于针对少数函数运行一对一的函数 2-process 很有用。
  • Pools 处理可批处理工作负载(高级任务/命令)在一组Processes(进程池)之间的分布。
  • 使用Pool用于结合处理器(与batchable输入/输出高的处理器负载),并pool.ThreadPool用于任务IO结合(与单独的输入/输出的低处理器负载)。
  • 对于Processes、Pools、Threads 和ThreadPools之间的数据传输,使用queues.Queue和子类(如果结果顺序很重要)或Pipes 与PipeConnections 到进程或线程的 1 对 1 映射。
  • 共享不同类型的变量(BaseProxyNamespaces和QueueS,PoolS或用于建立同步对象像Barrier/ Lock/ RLock/ Sempaphore/ ConditionS之间不同进程使用Manager类。
  • 如果GILs无法避免,使用Manager处理它们,并尝试将密集计算过程与GIL相关计算(例如复杂数据结构中的解析等)分开,然后与Pipes或共享Queues连接。
  • 使用多个Pools 可用于为不同的任务分配不同数量的进程。否则,只需Pool使用多个映射或应用方法调用来实现一个。
  • 建立在彼此中间结果上的顺序并行计算任务可以用单个或Pool()多个计算。为了使任务彼此同步,映射函数及其方法返回的实例是正确的选择。Pool.(star)map_async()Pool.(star)map()ApplyResult()ApplyResult().ready()/.wait()/.get()/.successful()

架构和流程

  • import multiprocessing被运行_current_process = MainProcess()初始化这是一个子类BaseProcess,但没有targetargskwargs_paraent_pid,基本上所有其他句柄对象Process在已经运行的内核蟒其中进口ES multiprocessing
  • pool.ThreadPool 是 Pool 的模拟 API,它可能也共享类似的架构
  • Pool基于 3 个守护线程Pool._task_handlerPool._worker_handler&Pool._result_handler与 1 个内部queue.Queue() Pool._taskqueue和 2 个内部SimpleQueuesPool._inqueuePool._outqueue.
  • Pool._cache是一个字典,其中包含ApplyResults来自 allPool.appy_async()/_map_async()和 submethod 调用的& 子类实例,全局ApplyResults._jobfrom job_counter()as key
  • ApplyResulta 的 s & 子类Pool可以在& 子方法的Pool._cache返回中Pool.apply_async()/._map_async()找到。
  • 之间的差Pool.map()Pool.map_async()Pool.map() == Pool.map_async().get()这些力/锁定主进程等待被计算并存储在返回的对象的所有结果ApplyResult()
  • Queue/ SimpleQueues inPool`:
    • Pool.taskqueue:管道Pool.apply_async()/.map_async()/etc的高级工作。从应用方法到Pool._task_handler.
    • Pool._inqueue:管道作业作为批处理?迭代器?从Pool._task_handlerPool._pool.Process(target=worker, ...)
    • Pool._outqueue: 将结果从Pool._pool.Process(target=worker, ...)(初始化Pool._worker_handler) 传送到Pool._result_handler,后者再次将_set()它们放入ApplyResult缓存在中的s 中Pool._cache[self._job]
  • ApplyResult如果目标func具有返回对象,则将结果保存为列表。否则,ApplyResult()它只是同步方法的句柄,即结果状态调用方法。
  • 为了连接进程和线程,按以下顺序提供了从高级到简单的 4 个类:queues.JoinableQueue, queues.Queue, SimpleQueue, Pipe/ PipeConnectionPipe只是一个返回 2 个实际PipeConnection类实例的方法。

一些代码示例

import logging
import multiprocessing as mp
import random
import time
import numpy as np
from copy import deepcopy

MODEL_INPUTS = ["input_ids", "mc_token_ids", "lm_labels", "mc_labels", "token_type_ids"]

mp.log_to_stderr(level=logging.INFO)  # mp.log_to_strerr(level=logging.DEBUG)
logger = mp.get_logger()
logger.setLevel(level=logging.INFO)  # mp.setLevel(level=logging.DEBUG)


def secs2hms(seconds, num_decimals=4):
    hms_time = [*(*divmod(divmod(int(seconds), 60)[0], 60), divmod(int(seconds), 60)[1])]
    if hasattr(seconds, '__round__'):
        hms_time[-1] += seconds.__round__(num_decimals) - int(seconds)
    return hms_time


class Timer():
    def __init__(self, time_name, log_method=print, time_format='hms', hms_decimals=4):
        self.time_name = time_name
        self.output_method = get_log_method(method_name=log_method_name)
        self.time_format = time_format
        self.hms_decimals = hms_decimals
        self.start_time = time.time()

    def start(self):
        raise RuntimeError('Timer was already started at initialization.')

    def stop(self, *args):
        seconds_time = time.time() - self.start_time
        time_name = self.time_name.format(*args)
        if self.time_format == 'hms':
            hms_time = secs2hms(seconds=seconds_time, num_decimals=self.hms_decimals)
            hms_time = ' '.join([text.format(dt) for dt, text in zip(hms_time, ['{}h', '{}min', '{}sec']) if dt > 0])
            self.output_method('{} = {}'.format(time_name, hms_time))
        else:
            self.output_method('{} = {}sec'.format(time_name, seconds_time))
        self._delete_timer()

    def _delete_timer(self):
        del self


def get_log_method(method_name):
    if method_name == 'debug':
        log_method = logger.debug
    elif method_name == 'info':
        log_method = logger.info
    else:
        log_method = print
    return log_method


def _generate_random_array(shape):
    return np.array([[[random.randint(0, 1000)
                       for _ in range(shape[2])]
                      for _ in range(shape[1])]
                     for _ in range(shape[0])])


def random_piped_array(shape, pipe_in, log_method_name='print', log_name='RANDOM'):
    log_method = get_log_method(method_name=log_method_name)
    array = _generate_random_array(shape=shape)
    log_method('{}: sending `array   through `pipe_in`'.format(log_name))
    pipe_in.send(array)


def random_array(shape, log_method_name='print', log_name='RANDOM'):
    log_method = get_log_method(method_name=log_method_name)
    assert len(shape) == 3
    array = _generate_random_array(shape=shape)
    log_method('{}: append `array` to `shared_array`'.format(log_name))
    # for dataset_name in ['train', 'valid']:
    #     shared_arrays[dataset_name].append(array)
    return array


def random_shared_array(shape, shared_arrays, log_method_name='print', log_name='SHARED_RANDOM'):
    log_method = get_log_method(method_name=log_method_name)
    assert len(shape) == 3
    array = _generate_random_array(shape=shape)
    log_method('{}: append `array` to `shared_array`'.format(log_name))
    shared_arrays.append(array)


def random_nested_array(shape, nested_shared_arrays, dataset_name, log_method_name='print', log_name='NESTED_RANDOM'):
    log_method = get_log_method(method_name=log_method_name)
    log_method('{}: appending array to shared_arrays[\'{}\']'.format(log_name, dataset_name))
    assert len(shape) == 3
    array = _generate_random_array(shape=shape)
    log_method('{}: appendind `array` to `shared_array` with currently len(nested_shared_array[\'{}\']) = {}'.format(
        log_name, dataset_name, len(nested_shared_arrays[dataset_name])))
    nested_shared_arrays[dataset_name].append(array)


def nested_dict_list_deepcopy(nested_shared_arrays):
    """No hierachical switching between mp.manager.BaseProxy and unshared elements"""
    nested_unshared_arrays = dict()
    for key, shared_list in nested_shared_arrays.items():
        nested_unshared_arrays[key] = deepcopy(shared_list)
    return nested_unshared_arrays


def log_arrays_state(arrays, log_method_name='print', log_name='ARRAY_STATE'):
    log_method = get_log_method(method_name=log_method_name)
    log_method('ARRAY_STATE: type(arrays) = {}'.format(type(arrays)))
    try:
        if hasattr(arrays, '__len__'):
            log_method('{}: len(arrays) = {}'.format(log_name, len(arrays)))
            if len(arrays) < 20:
                for idx, array in enumerate(arrays):
                    log_method('{}: type(arrays[{}]) = {}'.format(log_name, idx, type(array)))
                    if hasattr(array, 'shape'):
                        log_method('{}: arrays[{}].shape = {}'.format(log_name, idx, array.shape))
                    else:
                        log_method('{}: arrays[{}] has not `shape` attribute'.format(log_name, idx))
        else:
            log_method('{}: array has no `__len__` method'.format(log_name))

    except BrokenPipeError as error_msg:
        log_method('{}: BrokenPipeError: {}'.format(log_name, error_msg))


def log_nested_arrays_state(nested_arrays, log_method_name='print', log_name='NESTED_ARRAY_STATE'):
    log_method = get_log_method(method_name=log_method_name)
    log_method('{}: type(arrays) = {}'.format(log_name, type(nested_arrays)))
    for key, arrays in nested_arrays.items():
        log_arrays_state(arrays=arrays, log_name=log_name + '_' + key.upper(), log_method_name=log_method_name)


if __name__ == '__main__':
    log_method = logger.info
    # log_method cannot be pickled in map_async, therefore an extra log_method_name string is implemented to hand
    # through
    log_method_name = 'info'
    num_samples = 100
    num_processes = 1  # len(MODEL_INPUTS)  #
    array_shapes = [(num_samples, random.randint(2, 5), random.randint(100, 300)) for _ in range(len(MODEL_INPUTS))]


    def stdout_some_newlines(num_lines=2, sleep_time=1):
        print(''.join(num_lines * ['\n']))
        time.sleep(sleep_time)

    # Pool with results from `func` with `return` received from `AsyncResult`(=`ApplyResult`)
    # `AsyncResult` also used for process synchronization, e.g. waiting for processes to finish
    log_method('MAIN: setting up `Pool.map_async` with `return`ing `func`')
    async_return_timer = Timer(time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
                               log_method=log_method)
    # Pool with variable return
    setup_pool_timer = Timer(time_name='TIMER_SETUP: time to set up pool with {} processes'.format(num_processes),
                             log_method=log_method)
    with mp.Pool(processes=num_processes) as pool:
        setup_pool_timer.stop()
        arrays = pool.starmap_async(func=random_array, iterable=[(shape, log_method_name) for shape in array_shapes])
        getted_arrays = arrays.get()
        async_return_timer.stop()
        # Logging array state inside the `pool` context manager
        log_method('MAIN: arrays from `pool.map_async() return` with in the `pool`\'s context manager:')
        log_arrays_state(arrays=arrays, log_method_name=log_method_name)
        log_method('MAIN: arrays.get() from `pool.map_async() return` with in the `pool`\'s context manager:')
        log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
    # Logging array state outside the `pool` context manager
    log_method('MAIN: arrays from `pool.map_async() return` outside the `pool`\'s context manager:')
    log_arrays_state(arrays=arrays, log_method_name=log_method_name)
    log_method('MAIN: arrays.get() from `pool.map_async() return` outside the `pool`\'s context manager:')
    log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
    del pool, arrays, getted_arrays
    stdout_some_newlines()


    # Functionality of `np.Process().is_alive()
    log_method('IS_ALIVE: testing funcktionality of flag `mp.Process().is_alive()` w.r.t. process status')
    p = mp.Process(target=lambda x: x ** 2, args=(10,))
    log_method('IS_ALIVE: after intializing, before starting: {}'.format(p.is_alive()))
    p.start()
    log_method('IS_ALIVE: after starting, before joining: p.is_alive() = {}'.format(p.is_alive()))
    time.sleep(5)
    log_method('IS_ALIVE: after sleeping 5sec, before joining: p.is_alive() = {}'.format(p.is_alive()))
    p.join()
    log_method('IS_ALIVE: after joining: p.is_alive() = {}'.format(p.is_alive()))
    p.terminate()
    del p
    stdout_some_newlines()

    # Pool with `func` `return`ing results directly to the reuslt handler from `mp.Pool().starmap_async()` of type
    # `AsyncResults()`
    log_method(
        'MAIN: Pool.map() is not tested explicitly because is equivalent to `Pool.map() == Pool.map_async().get()')
    stdout_some_newlines()


    # Pool with results assigned to shared variable & `AsyncResult` only used for process synchronization but
    # not for result receiving

    log_method(
        'MAIN: setting up Manager(), Manager.list() as shared variable and Pool.starmap_async with results from shared '
        'variable')
    async_shared_timer = Timer(
        time_name='TIMER_POOL_SHARED: time for random array with {} processes'.format(num_processes),
        log_method=log_method)
    setup_shared_variable_timer = Timer(time_name='TIMEE_INIT: time to set up shared variable', log_method=log_method)
    with mp.Manager() as sync_manager:
        shared_arrays = sync_manager.list()
        setup_shared_variable_timer.stop()
        async_return_timer = Timer(
            time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
            log_method=log_method)
        setup_pool_timer = Timer(
            time_name='TIMER_POOL_INIT: time to set up pool with {} processes'.format(num_processes),
            log_method=log_method)
        with mp.Pool(processes=num_processes) as pool:
            setup_pool_timer.stop()
            async_result = pool.starmap_async(
                func=random_shared_array,
                iterable=[(shape, shared_arrays, log_method_name) for shape in array_shapes])
            log_method('MAIN: async_result.ready() befor async.wait() = {}'.format(async_result.ready()))
            async_result.wait()
            log_method('MAIN: async_result.ready() after async.wait() = {}'.format(async_result.ready()))
            log_method('MAIN: asyn_result.sucessful() after async.wait() = {}'.format(async_result.successful()))
            async_return_timer.stop()

            copy_timer = Timer('TIMER_COPY: time to copy shared_arrays to standard arrays', log_method=log_method)
            unshared_arrays = deepcopy(shared_arrays)
            copy_timer.stop()
            async_shared_timer.stop()
            log_method('MAIN: shared_arrays from `pool.map_async()` within `sync_manager` context manager:')
            log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
            log_method(
                'MAIN: unshared_arrays = deepcopy(shared_arrays) from `pool.map_async()` within `sync_manager`\'s '
                'context manager:')
            log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)

    log_method('MAIN: shared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
    log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
    log_method('MAIN: unshared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
    log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
    del sync_manager, shared_arrays, async_result, pool, unshared_arrays
    stdout_some_newlines()

    # Same as above just with pipe instead of `shared_arrays`
    log_method('MAIN: separate process outputting to `mp.Pipe()`')
    process_pipe_timer = Timer(time_name='TIMER_PIPE: time for `random_pipe_array` outputting through a `mp.Pipe()')
    arrays = list()
    pipe_in, pipe_out = mp.Pipe()
    # initialize processes
    processes = [mp.Process(target=random_piped_array, args=(shape, pipe_in, log_method_name)) for shape in
                 array_shapes]
    # Start processes
    for process in processes:
        process.start()
    # Collect piped arrays form pipe and append them to `arrays`
    while (any([process.is_alive() for process in processes]) or pipe_out.poll()) and len(arrays) < len(MODEL_INPUTS):
        log_method(
            'RANDOM: receiving arrays through pipe and appending to arrays with currently len(arrays) = {}'.format(
                len(arrays)))
        arrays.append(pipe_out.recv())
    # join processes
    for process in processes:
        process.join()
    process_pipe_timer.stop()
    log_arrays_state(arrays=arrays, log_method_name=log_method_name)
    pipe_in.close()
    pipe_out.close()
    del arrays, pipe_in, pipe_out, processes, process
    stdout_some_newlines()

    # Nested shared dict/list/arrays
    log_method('MAIN: `random_nested_arrays` with nested shared `mp.Manager().dict()` and `mp.Manager().list()`s')
    nested_timer = Timer(time_name='TIMER_NESTED: time for `random_nested_arrays()`')
    with mp.Manager() as sync_manager:
        nested_shared_arrays = sync_manager.dict()
        nested_shared_arrays['train'] = sync_manager.list()
        nested_shared_arrays['valid'] = sync_manager.list()
        with mp.Pool(processes=num_processes) as pool:
            nested_results = pool.starmap_async(func=random_nested_array,
                                                iterable=[(shape, nested_shared_arrays, dataset_name, log_method_name)
                                                          for dataset_name in nested_shared_arrays.keys()
                                                          for shape in array_shapes])
            nested_results.wait()
            unshared_nested_arrays = nested_dict_list_deepcopy(nested_shared_arrays)
            nested_timer.stop()
    log_nested_arrays_state(nested_arrays=unshared_nested_arrays, log_method_name=log_method_name)
    del sync_manager, nested_shared_arrays, pool, nested_results, unshared_nested_arrays
    stdout_some_newlines()

    # List of processes targeted directly to their functions one by one
    log_method(
        'MAIN: separate process outputting to shared `mp.Manager.list()` with process handles maintained in list()')
    log_method('MAIN: separate process implementations are only preferred over pools for 1-to-1=processes-to-tasks'
               ' relations or asynchronous single tasks calculations.')
    processes_timer = Timer(
        time_name='TIMER_PROCESS: time for `random_shared_arrays` with separate {} processes'.format(num_processes),
        log_method=log_method)
    with mp.Manager() as sync_manager:
        shared_arrays = sync_manager.list()
        # Initialize processes
        processes = [mp.Process(target=random_shared_array, args=(shape, shared_arrays, log_method_name))
                     for shape in array_shapes]
        # Start processes
        for process in processes:
            process.start()
        processes_timer.stop()
        # Join processes = wait for processes to finish
        for process in processes:
            process.join()
        unshared_process_arrays = deepcopy(shared_arrays)
        processes_timer.stop()
    log_arrays_state(arrays=unshared_process_arrays, log_method_name=log_method_name)
    del sync_manager, shared_arrays, unshared_process_arrays, processes, process
    stdout_some_newlines()
Run Code Online (Sandbox Code Playgroud)


小智 1

Python 官方文档有很多使用示例。这可能是学习最佳实践的最佳方式:http://docs.python.org/2/library/multiprocessing.html