很抱歉,我无法用更简单的示例重现错误,而且我的代码太复杂而无法发布.如果我在IPython shell而不是常规Python中运行程序,那么事情就会很顺利.
我查看了之前关于这个问题的一些注意事项.它们都是由在类函数中定义的pool to call函数引起的.但对我来说情况并非如此.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)
我将不胜感激任何帮助.
更新:我挑选的功能是在模块的顶层定义的.虽然它调用包含嵌套函数的函数.即f()要求g()调用h()具有嵌套函数i(),和我打电话pool.apply_async(f).f(),g(),h()都在顶层定义.我用这个模式尝试了更简单的例子,但它确实有效.
我遇到了多处理模块的麻烦.我正在使用一个带有map方法的工作池来从大量文件加载数据,并且每个文件都使用自定义函数分析数据.每次处理文件时,我都希望更新一个计数器,以便我可以跟踪要处理的文件数量.这是示例代码:
def analyze_data( args ):
# do something
counter += 1
print counter
if __name__ == '__main__':
list_of_files = os.listdir(some_directory)
global counter
counter = 0
p = Pool()
p.map(analyze_data, list_of_files)
Run Code Online (Sandbox Code Playgroud)
我无法找到解决方案.
我有以下功能:
def copy_file(source_file, target_dir):
pass
Run Code Online (Sandbox Code Playgroud)
现在我想用来立即multiprocessing执行这个功能:
p = Pool(12)
p.map(lambda x: copy_file(x,target_dir), file_list)
Run Code Online (Sandbox Code Playgroud)
问题是,lambda不能被腌制,所以这就失败了.解决这个问题最简洁(pythonic)的方法是什么?
我想multiprocessing在Python中使用该库.遗憾地multiprocessing使用pickle不支持闭包,lambdas或函数的函数__main__.所有这三个对我来说都很重要
In [1]: import pickle
In [2]: pickle.dumps(lambda x: x)
PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>
Run Code Online (Sandbox Code Playgroud)
幸运的是,有dill一个更健壮的泡菜.显然dill在导入时执行魔术以使泡菜工作
In [3]: import dill
In [4]: pickle.dumps(lambda x: x)
Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ...
Run Code Online (Sandbox Code Playgroud)
这非常令人鼓舞,特别是因为我无法访问多处理源代码.可悲的是,我仍然无法得到这个非常基本的例子
import multiprocessing as mp
import dill
p = mp.Pool(4)
print p.map(lambda x: x**2, range(10))
Run Code Online (Sandbox Code Playgroud)
为什么是这样?我错过了什么?究竟是multiprocessing+ dill组合的限制是什么?
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Temporary Edit for J.F Sebastian
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Exception …Run Code Online (Sandbox Code Playgroud) 我正在尝试编写一个与a同时应用函数的应用程序multiprocessing.Pool.我希望这个函数是一个实例方法(所以我可以在不同的子类中以不同的方式定义它).这似乎不可能; 正如我在其他地方所了解的那样,显然绑定的方法无法被腌制.那么为什么multiprocessing.Process以绑定方法作为目标工作开始?以下代码:
import multiprocessing
def test1():
print "Hello, world 1"
def increment(x):
return x + 1
class testClass():
def process(self):
process1 = multiprocessing.Process(target=test1)
process1.start()
process1.join()
process2 = multiprocessing.Process(target=self.test2)
process2.start()
process2.join()
def pool(self):
pool = multiprocessing.Pool(1)
for answer in pool.imap(increment, range(10)):
print answer
print
for answer in pool.imap(self.square, range(10)):
print answer
def test2(self):
print "Hello, world 2"
def square(self, x):
return x * x
def main():
c = testClass()
c.process()
c.pool()
if __name__ == "__main__": …Run Code Online (Sandbox Code Playgroud) multiprocessing.Pool让我疯了......
我想要升级许多软件包,对于每一个软件包,我都要检查是否有更大的版本.这是由该check_one功能完成的.
主要代码在Updater.update方法中:我创建了Pool对象并调用该map()方法.
这是代码:
def check_one(args):
res, total, package, version = args
i = res.qsize()
logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
i / float(total), package, i, total, addn=False)
try:
json = PyPIJson(package).retrieve()
new_version = Version(json['info']['version'])
except Exception as e:
logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
return
if new_version > version:
res.put_nowait((package, version, new_version, json))
class Updater(FileManager):
# __init__ and other methods...
def update(self):
logger.info('Searching for updates')
packages = Queue.Queue() …Run Code Online (Sandbox Code Playgroud) 我在多处理模块中使用Pool.map_async()(以及Pool.map())时遇到问题.我已经实现了一个并行循环函数,只要函数输入到Pool.map_async是一个"常规"函数,它就能正常工作.当函数是例如类的方法时,我得到一个PicklingError:
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)
我只使用Python进行科学计算,所以我对酸洗的概念不太熟悉,今天就对它进行了一些了解.我在使用python的多处理Pool.map()时已经看过几个先前的答案,比如无法选择<type'instancemethod'>,但我无法弄清楚如何使它工作,即使遵循提供的链接回答.
我的代码,目标是使用多个内核模拟Normal rv的向量.请注意,这只是一个示例,可能甚至没有在多个内核上运行的回报.
import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat
def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
"""
Purpose: Evaluate function using Multiple cores.
Input:
func - Function to evaluate in parallel
arg - Array of arguments to evaluate func(arg)
static_arg - The "static" argument (if any), i.e. the variables that are constant in the evaluation …Run Code Online (Sandbox Code Playgroud) 我有一个Python的扩展模块,它使用SWIG作为包装器,我尝试用Pickle序列化它,我失败了=)
__reduce_ex__在我的C++代码中实现方法.有没有人有例子__reduce_ex__?有类似的Stackoverflow问题,但它省略了manager_constructor规范和实现.我想运行这样的东西:
from multiprocessing import Pool
import time
import random
class Controler(object):
def __init__(self):
nProcess = 10
pages = 10
self.__result = []
self.manageWork(nProcess,pages)
def BarcodeSearcher(x):
return x*x
def resultCollector(self,result):
self.__result.append(result)
def manageWork(self,nProcess,pages):
pool = Pool(processes=nProcess)
for pag in range(pages):
pool.apply_async(self.BarcodeSearcher, args = (pag, ), callback = self.resultCollector)
print self.__result
if __name__ == '__main__':
Controler()
Run Code Online (Sandbox Code Playgroud)
但代码导致错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Python26\lib\threading.py", line 522, in __bootstrap_inner
self.run()
File "C:\Python26\lib\threading.py", line 477, in run
self.__target(*self.__args, **self.__kwargs)
File …Run Code Online (Sandbox Code Playgroud) 代码如下所示:
import multiprocessing as mp
from functools import partial
import boto3
import numpy as np
s3 = boto3.client('s3')
def _something(**kwargs):
# Some mixed integer programming stuff related to the variable archive
return np.array(some_variable_related_to_archive)
def do(s3):
archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant
pool = mp.pool()
sub_process = partial(_something, slack=0.1)
parts = np.array_split(archive, some_int)
target_parts = np.array(things)
out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line
pool.close()
pool.join()
do(s3)
Run Code Online (Sandbox Code Playgroud)
错误:
_pickle.PicklingError: Can't pickle …Run Code Online (Sandbox Code Playgroud)