为什么我不能在multiprocessing.Pool中使用operator.itemgetter?

phi*_*hag 7 python multiprocessing python-multithreading

以下程序:

import multiprocessing,operator
f = operator.itemgetter(0)
# def f(*a): return operator.itemgetter(0)(*a)
if __name__ == '__main__':
    multiprocessing.Pool(1).map(f, ["ab"])
Run Code Online (Sandbox Code Playgroud)

失败,出现以下错误:

Process PoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.2/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
  File "/usr/lib/python3.2/multiprocessing/process.py", line 116, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.2/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python3.2/multiprocessing/queues.py", line 382, in get
    return recv()
TypeError: itemgetter expected 1 arguments, got 0
Run Code Online (Sandbox Code Playgroud)

为什么我会得到错误(在Linux x64上的cPython 2.7和3.2上),如果我取消注释第三行,为什么它会消失?

aba*_*ert 6

这里的问题是多处理模块通过复制将对象传递给其他进程(显然),并且itemgetter对象不能使用任何明显的方法进行复制:

In [10]: a = operator.itemgetter(0)
Out[10]: copy.copy(a)
TypeError: itemgetter expected 1 arguments, got 0

In [10]: a = operator.itemgetter(0)
Out[10]: copy.deepcopy(a)
TypeError: itemgetter expected 1 arguments, got 0

In [10]: a = operator.itemgetter(0)
Out[10]: pickle.dumps(a)
TypeError: can't pickle itemgetter objects

# etc.
Run Code Online (Sandbox Code Playgroud)

问题甚至没有试图在其他进程中调用f; 它试图首先复制它.(如果你看一下上面省略的堆栈跟踪,你会看到更多关于它失败原因的信息.)

当然,这通常无关紧要,因为在动态构建新的项目集时,复制一个项目几乎同样简单有效.这就是你的替代"f"功能正在做的事情.(当然,复制一个动态创建项目集的函数不需要复制项目集.)

你可以把"f"变成一个lambda.或者编写一个简单的函数(named或lambda),它可以在不使用itemgetter的情况下执行相同的操作.或者写一个可复制的itemgetter替换品(显然不会那么难).但是你不能直接使用itemgetter对象 - 就像你想要的那样.