获取“队列对象仅应通过继承在进程之间共享”,但我没有使用队列

Tha*_*ing 1 python concurrent.futures python-multiprocessing

我正在尝试使用ProcessPoolExecutor,但是出现错误“队列对象仅应通过继承在进程之间共享”,但我没有使用队列(至少没有明确地使用)。我找不到任何可以解释我做错事情的信息。

这是一些演示问题的代码(不是我的实际代码):

from concurrent.futures import ProcessPoolExecutor, as_completed

class WhyDoesntThisWork:

    def __init__(self):
        self.executor = ProcessPoolExecutor(4)

    def execute_something(self, starting_letter):
        futures = [self.executor.submit(self.something, starting_letter, d) for d in range(4)]
        letter = None
        for future in as_completed(futures):
            letter = future.result()
        print(letter)

    def something(self, letter, d):
        # do something pointless for the example
        for x in range(d):
            letter = chr(ord(letter) + 1)

if __name__ == '__main__':
    WhyDoesntThisWork(). execute_something('A')
Run Code Online (Sandbox Code Playgroud)

El Ruso指出,使something()成为静态方法或类方法会使错误消失。不幸的是,我的实际代码需要使用self调用其他方法。

小智 5

无需使用静态方法即可解决。

使用进程时,每个进程都在独立的内存空间中运行。这与使用线程不同,当不同的线程在相同的进程下使用相同的内存空间运行时,这是不同的。因此,使用ThreadPoolExecutor时不会发生错误, 而是在中发生ProcessPoolExecutor

因此,当将类实例的功能传递到单独的子流程中时,多处理机制会对该功能进行酸洗,以便可以将功能作为独立实例传递到子流程中。并且当子流程加入时,该类由未选出的函数实例更新。

要使其工作,只需在类中添加__getstate__()__setstate__()函数即可指导该类如何对函数进行腌制和解腌。在酸洗中,可以排除不必要的字段,如图所示del self_dict['executor']

import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completed


class GuessItWorksNow():

    def __init__(self):
        self.executor = ProcessPoolExecutor(4)

    def __getstate__(self):
        state = self.__dict__.copy()
        del state['executor']
        return state

    def __setstate__(self, state):
        self.__dict__.update(state)

    def something(self, letter, d):
        # do something pointless for the example
        p = multiprocessing.current_process()
        time.sleep(1)
        for x in range(d):
            letter = chr(ord(letter) + 1)
        return (f'[{p.pid}] ({p.name}) ({letter})')

    def execute_something(self, starting_letter):
        futures = [self.executor.submit(self.something, starting_letter, d) for d in range(10)]
        for future in as_completed(futures):
            print(future.result())


if __name__ == '__main__':
    obj = GuessItWorksNow()
    obj.execute_something('A')
Run Code Online (Sandbox Code Playgroud)