如何在Python中使用多处理队列?

jab*_*jab 68 python multithreading multiprocessing

我在尝试理解多处理队列如何在python上工作以及如何实现它时遇到了很多麻烦.假设我有两个从共享文件访问数据的python模块,让我们将这两个模块称为编写者和读者.我的计划是让读取器和写入器将请求放入两个单独的多处理队列,然后让第三个进程在循环中弹出这些请求并执行.

我的主要问题是我真的不知道如何正确实现multiprocessing.queue,你不能真正实例化每个进程的对象,因为它们将是独立的队列,你如何确保所有进程都与共享队列相关(或者在这种情况下,队列)

Mik*_*ton 90

我的主要问题是我真的不知道如何正确实现multiprocessing.queue,你不能真正实例化每个进程的对象,因为它们将是独立的队列,你如何确保所有进程都与共享队列相关(或者在这种情况下,队列)

这是一个共享单个队列的读写器的简单例子......作者向读者发送了一堆整数; 当编写器用尽数字时,它会发送'DONE',让读者知道如何摆脱读取循环.

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))
Run Code Online (Sandbox Code Playgroud)

  • 很好的例子.正如解决OP混淆的一些额外信息......这个例子表明共享队列需要来自主进程,然后传递给它的所有子进程.为了使两个完全不相关的进程共享数据,它们必须通过某些中央或相关的网络设备(例如套接字)进行通信.有些东西必须协调信息. (8认同)
  • 很好的例子..我也是这个主题的新手..如果我有多个进程运行相同的目标函数(使用不同的参数),如何确保他们在将数据放入队列时不会发生冲突..是必要的锁? (4认同)
  • @bharat_iyengar 从多处理模块文档中,它说队列是使用一些锁/信号量实现的。因此,当您使用 get() 和 put(object) Queue 方法时,如果其他进程/线程尝试在队列中获取或放置某些内容,队列将阻塞。所以你不必担心手动锁定它。 (3认同)
  • 显式停止条件优于隐式停止条件 (3认同)
  • 如果队列读取器超过了队列写入器的速率,则Qsize可以为零 (2认同)

Joe*_*way 14

这是一个非常简单的multiprocessing.Queueand用法,multiprocessing.Process它允许调用者将“事件”和参数发送到一个单独的进程,该进程将事件分派到进程上的“do_”方法。(Python 3.4+)

import multiprocessing as mp
import collections

Msg = collections.namedtuple('Msg', ['event', 'args'])

class BaseProcess(mp.Process):
    """A process backed by an internal queue for simple one-way message passing.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = mp.Queue()

    def send(self, event, *args):
        """Puts the event and args as a `Msg` on the queue
        """
       msg = Msg(event, args)
       self.queue.put(msg)

    def dispatch(self, msg):
        event, args = msg

        handler = getattr(self, "do_%s" % event, None)
        if not handler:
            raise NotImplementedError("Process has no handler for [%s]" % event)

        handler(*args)

    def run(self):
        while True:
            msg = self.queue.get()
            self.dispatch(msg)
Run Code Online (Sandbox Code Playgroud)

用法:

class MyProcess(BaseProcess):
    def do_helloworld(self, arg1, arg2):
        print(arg1, arg2)

if __name__ == "__main__":
    process = MyProcess()
    process.start()
    process.send('helloworld', 'hello', 'world')
Run Code Online (Sandbox Code Playgroud)

send在父进程发生,do_*发生在子进程。

我省略了任何明显会中断运行循环并退出子进程的异常处理。您还可以通过覆盖run来控制阻塞或其他任何东西来自定义它。

这实际上仅在您只有一个工作进程的情况下才有用,但我认为这是对这个问题的相关答案,以演示具有更多面向对象的常见场景。

  • 出色的答案!谢谢。+50:) (2认同)

小智 10

我查看了堆栈溢出和网络上的多个答案,同时尝试设置一种使用队列进行多处理的方法,以传递大型 Pandas 数据帧。在我看来,每个答案都在重复相同类型的解决方案,而没有考虑在设置此类计算时肯定会遇到的众多边缘情况。问题是同时有很多事情在起作用。任务数、worker 数、每个任务的持续时间以及任务执行过程中可能出现的异常。所有这些都使同步变得棘手,并且大多数答案都没有解决您如何进行同步。所以这是我摆弄几个小时后的看法,希望这对于大多数人来说足够通用以发现它有用。

在任何编码示例之前的一些想法。由于queue.Emptyqueue.qsize()或任何其他类似的方法对于流量控制不可靠,任何类似的代码

while True:
    try:
        task = pending_queue.get_nowait()
    except queue.Empty:
        break
Run Code Online (Sandbox Code Playgroud)

是假的。即使几毫秒之后另一个任务出现在队列中,这也会杀死工作人员。工人不会恢复,一段时间后所有工人都会消失,因为他们随机发现队列暂时为空。最终结果将是主要的多处理函数(进程中带有 join() 的函数)将在所有任务尚未完成的情况下返回。好的。如果您有数千个任务并且缺少一些任务,那么祝您在调试过程中好运。

另一个问题是哨兵值的使用。许多人建议在队列中添加一个哨兵值来标记队列的末尾。但是究竟要向谁举报呢?如果有 N 个 worker,假设 N 是可用的内核数量,那么单个哨兵值只会向一个 worker 标记队列的末尾。当没有剩下的工作时,所有其他工人将坐着等待更多工作。我见过的典型例子是

while True:
    task = pending_queue.get()
    if task == SOME_SENTINEL_VALUE:
        break
Run Code Online (Sandbox Code Playgroud)

一名工人将获得哨兵值,而其余工人将无限期等待。我遇到的任何帖子都没有提到您需要将哨兵值提交到队列的次数至少与您有工人的次数相同,以便所有人都能得到它。

另一个问题是任务执行期间的异常处理。同样,这些应该被捕获和管理。此外,如果您有一个completed_tasks队列,您应该在决定工作完成之前以确定性的方式独立计算队列中有多少项目。再次依赖队列大小必然会失败并返回意外结果。

在下面的示例中,该par_proc()函数将接收一个任务列表,其中包括应与任何命名参数和值一起执行这些任务的函数。

import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil

SENTINEL = None


def do_work(tasks_pending, tasks_completed):
    # Get the current worker's name
    worker_name = mp.current_process().name

    while True:
        try:
            task = tasks_pending.get_nowait()
        except queue.Empty:
            print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
            time.sleep(0.01)
        else:
            try:
                if task == SENTINEL:
                    print(worker_name + ' no more work left to be done. Exiting...')
                    break

                print(worker_name + ' received some work... ')
                time_start = time.perf_counter()
                work_func = pickle.loads(task['func'])
                result = work_func(**task['task'])
                tasks_completed.put({work_func.__name__: result})
                time_end = time.perf_counter() - time_start
                print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
            except Exception as e:
                print(worker_name + ' task failed. ' + str(e))
                tasks_completed.put({work_func.__name__: None})


def par_proc(job_list, num_cpus=None):

    # Get the number of cores
    if not num_cpus:
        num_cpus = psutil.cpu_count(logical=False)

    print('* Parallel processing')
    print('* Running on {} cores'.format(num_cpus))

    # Set-up the queues for sending and receiving data to/from the workers
    tasks_pending = mp.Queue()
    tasks_completed = mp.Queue()

    # Gather processes and results here
    processes = []
    results = []

    # Count tasks
    num_tasks = 0

    # Add the tasks to the queue
    for job in job_list:
        for task in job['tasks']:
            expanded_job = {}
            num_tasks = num_tasks + 1
            expanded_job.update({'func': pickle.dumps(job['func'])})
            expanded_job.update({'task': task})
            tasks_pending.put(expanded_job)

    # Use as many workers as there are cores (usually chokes the system so better use less)
    num_workers = num_cpus

    # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
    # work left to be done.
    for c in range(num_workers):
        tasks_pending.put(SENTINEL)

    print('* Number of tasks: {}'.format(num_tasks))

    # Set-up and start the workers
    for c in range(num_workers):
        p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
        p.name = 'worker' + str(c)
        processes.append(p)
        p.start()

    # Gather the results
    completed_tasks_counter = 0
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1

    for p in processes:
        p.join()

    return results
Run Code Online (Sandbox Code Playgroud)

这是运行上述代码的测试

def test_parallel_processing():
    def heavy_duty1(arg1, arg2, arg3):
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert job1 == 15
    assert job2 == 21
Run Code Online (Sandbox Code Playgroud)

加上另一个有一些例外

def test_parallel_processing_exceptions():
    def heavy_duty1_raises(arg1, arg2, arg3):
        raise ValueError('Exception raised')
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert not job1
    assert job2 == 21
Run Code Online (Sandbox Code Playgroud)

希望这是有帮助的。


小智 7

在" from queue import Queue"中没有调用模块queue,而multiprocessing应该使用.因此,它应该看起来像" from multiprocessing import Queue"

  • 虽然多年来使用`multiprocessing.Queue`是正确的.正常的`Queue.Queue`用于python*threads*.当您尝试将`Queue.Queue`与多处理一起使用时,将在每个子进程中创建Queue对象的副本,并且永远不会更新子进程.基本上,`Queue.Queue`使用全局共享对象,而`multiprocessing.Queue`使用IPC.请参阅:/sf/ask/64757031/ (7认同)