我想在此代码中使用多进程包.我试图调用该函数create_new_population
并将数据分发到8个处理器,但是当我这样做时,我得到了pickle错误.
通常函数会像这样运行: self.create_new_population(self.pop_size)
我尝试像这样分发工作:
f= self.create_new_population
pop = self.pop_size/8
self.current_generation = [pool.apply_async(f, pop) for _ in range(8)]
Run Code Online (Sandbox Code Playgroud)
我得到
或Can't pickle local object 'exhaust.__init__.<locals>.tour_select'
PermissionError: [WinError 5] Access is denied
我仔细阅读了这个帖子,并尝试使用Steven Bethard的方法绕过错误,允许通过copyreg进行方法酸洗/ 取消:
def _pickle_method(method)
def _unpickle_method(func_name, obj, cls)
Run Code Online (Sandbox Code Playgroud)
我还尝试使用pathos
包没有任何运气.
我知道应该在if __name__ == '__main__':
块下调用代码,但我想知道是否可以在代码中尽可能少的更改来完成.
python parallel-processing multithreading python-multiprocessing pathos
我正在对我拥有的数据集运行拼写校正功能。我曾经from pathos.multiprocessing import ProcessingPool as Pool
做过这项工作。处理完成后,我想实际访问结果。这是我的代码:
import codecs
import nltk
from textblob import TextBlob
from nltk.tokenize import sent_tokenize
from pathos.multiprocessing import ProcessingPool as Pool
class SpellCorrect():
def load_data(self, path_1):
with codecs.open(path_1, "r", "utf-8") as file:
data = file.read()
return sent_tokenize(data)
def correct_spelling(self, data):
data = TextBlob(data)
return str(data.correct())
def run_clean(self, path_1):
pool = Pool()
data = self.load_data(path_1)
return pool.amap(self.correct_spelling, data)
if __name__ == "__main__":
path_1 = "../Data/training_data/training_corpus.txt"
SpellCorrect = SpellCorrect()
result = SpellCorrect.run_clean(path_1)
print(result)
result = " ".join(temp …
Run Code Online (Sandbox Code Playgroud) python multiprocessing python-3.x python-multiprocessing pathos
执行下面的代码时出现错误。问题似乎是map
不支持接受多个输入的函数,就像在 python 内置multiprocessing
包中一样。但在内置包中,有一个starmap
可以解决这个问题。pathos.multiprocessing
有相同的吗?
import pathos.multiprocessing as mp
class Bar:
def foo(self, name):
return len(str(name))
def boo(self, x, y, z):
sum = self.foo(x)
sum += self.foo(y)
sum += self.foo(z)
return sum
if __name__ == '__main__':
b = Bar()
pool = mp.ProcessingPool()
results = pool.map(b.boo, [(12, 3, 456), (8, 9, 10), ('a', 'b', 'cde')])
print(results)
Run Code Online (Sandbox Code Playgroud)
类型错误:boo() 缺少 2 个必需的位置参数:“y”和“z”
按照建议更新 lambda 表达式(不起作用):
if __name__ == '__main__':
b = Bar()
pool = mp.ProcessingPool()
results = …
Run Code Online (Sandbox Code Playgroud) 我正在使用pathos.multiprocessing来并行化需要使用实例方法的程序.这是一个最小的工作示例:
import time
import numpy as np
from pathos.multiprocessing import Pool, ProcessingPool, ThreadingPool
class dummy(object):
def __init__(self, arg, key1=None, key2=-11):
np.random.seed(arg)
randnum = np.random.randint(0, 5)
print 'Sleeping {} seconds'.format(randnum)
time.sleep(randnum)
self.value = arg
self.more1 = key1
self.more2 = key2
args = [0, 10, 20, 33, 82]
keys = ['key1', 'key2']
k1val = ['car', 'borg', 'syria', 'aurora', 'libera']
k2val = ['a', 'b', 'c', 'd', 'e']
allks = [dict(zip(keys, [k1val[i], k2val[i]])) for i in range(5)]
pool = ThreadingPool(4)
result = pool.map(dummy, args, …
Run Code Online (Sandbox Code Playgroud) 我有一个函数,它接收图像列表并在将 OCR 应用于图像后在列表中生成输出。我有另一个函数,通过使用多处理来控制此函数的输入。因此,当我只有一个列表(即没有多重处理)时,列表中的每个图像花费了大约 1 秒,但是当我将必须并行处理的列表增加到 4 时,每个图像花费了惊人的 13 秒。
为了了解问题的真正所在,我尝试创建一个最小的问题示例。在这里,我有两个函数eat25
,eat100
它们打开一个图像name
并将其提供给使用 API 的 OCR pytesseract
。eat25
做25次,eat100
做100次。
我的目标是在eat100
没有多处理和eat25
多处理(有 4 个进程)的情况下运行。理论上,eat100
如果我有 4 个独立的处理器(我有 2 个内核,每个内核有 2 个线程,因此 CPU(s) = 4(如果我错了,请纠正我),这应该比我少 4 倍的时间)。
但是当我看到代码在打印“Processing 0”4次后甚至没有响应时,所有的理论都被浪费了。不过,单处理器功能eat100
运行良好。
我测试了一个简单的范围立方函数,它在多处理中运行良好,所以我的处理器可以肯定运行良好。这里唯一的罪魁祸首可能是:
pytesseract
: 看到这个`
from pathos.multiprocessing import ProcessingPool
from time import time
from PIL import Image
import pytesseract as pt
def eat25(name):
for i in range(25):
print('Processing :'+str(i)) …
Run Code Online (Sandbox Code Playgroud) 我有一个清单的理解:
thingie=[f(a,x,c) for x in some_list]
Run Code Online (Sandbox Code Playgroud)
我正在并行化如下:
from multiprocessing import Pool
pool=Pool(processes=4)
thingie=pool.map(lambda x: f(a,x,c), some_list)
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7f60b3b0e9d8>:
attribute lookup <lambda> on __main__ failed
Run Code Online (Sandbox Code Playgroud)
我尝试安装pathos
显然可以解决此问题的软件包,但是当我尝试导入它时,出现错误:
ImportError: No module named 'pathos'
Run Code Online (Sandbox Code Playgroud) 我的代码呢
from pathos.multiprocessing import ProcessingPool
def myFunc(something):
thispool = ProcessingPool(nodes=Result.cores)
listOfResults = thispool.map(something)
return listOfResults
for i in range(1000):
myFunc(i)
Run Code Online (Sandbox Code Playgroud)
现在,在我实际涉及的更多代码中,内存使用量不断增长.代码应该不采取任何措施,但如果我使用12个内核运行它,这12个内核最初将占用近1mb内存,但在几个小时的运行时间内,每个内核需要几GB.
所以,我认为池会泄漏内存,我最好在每次迭代后关闭它:
def myFunc(something):
thispool = ProcessingPool(nodes=Result.cores)
listOfResults = thispool.map(something)
thispool.close()
thispool.join()
return listOfResults
Run Code Online (Sandbox Code Playgroud)
但是,现在,经过几次迭代,我得到了
ValueError: Pool not running
Run Code Online (Sandbox Code Playgroud)
在this pool.map()
行.如果我创建一个新的
test = ProcessingPool(nodes=4)
Run Code Online (Sandbox Code Playgroud)
并尝试运行test.map()
,我得到同样的错误.这很奇怪,我初始化了一个新的变量......确实pathos.processing.ProcessingPool
有一个独特的进程池的功能,如果我关闭一个,我关闭所有?
pathos.multiprocessing.ProcessingPool
在没有内存泄漏的情况下实现内部循环的正确方法是什么?
当我改用时multiprocessing.Pool
,不会出现问题.
我ProcessingPool.map()
遇到了与Pathos功能低效并行化的问题:在处理结束时,一个缓慢运行的工作人员按顺序处理列表中的最后一个任务,而其他工作人员则空闲。我认为这是由于任务列表的“分块”所致。
使用 Python 自己的时,multiprocessing.Pool
我可以通过chunksize=1
在调用map
. 但是,Pathos 不支持此论点,源代码表明这可能是开发人员的疏忽或待办事项:
return _pool.map(star(f), zip(*args)) # chunksize
Run Code Online (Sandbox Code Playgroud)
(来自 Pathos' multiprocessing.py
,第 137 行)
我想保留 Pathos,因为它能够与 lamdbas 一起工作。
有没有办法让块大小在Pathos 中运行?是否有使用 Patho 的其他记录不佳的池实现之一的解决方法?
我有可以在 Windows 上运行的 Python 代码,但是在 Linux 上运行时它就会挂起。我正在使用 JPype,因此我怀疑多个共享进程尝试使用同一管道访问 Java 可能存在一些问题(创建了不同的进程,但挂在 JPype 行)。有没有办法强制在 Pathos 中生成以复制 Windows 实现?(例如常规多处理库中的 set_start_method 或 get_context ?)
谢谢。
我试图在python下运行并行进程(在ubuntu上).
我开始使用多处理,它适用于简单的例子.
然后是泡菜错误,所以我切换到了悲.. 我对不同的选项感到困惑,所以写了一个非常简单的基准测试代码.
import multiprocessing as mp
from pathos.multiprocessing import Pool as Pool1
from pathos.pools import ParallelPool as Pool2
from pathos.parallel import ParallelPool as Pool3
import time
def square(x):
# calculate the square of the value of x
return x*x
if __name__ == '__main__':
dataset = range(0,10000)
start_time = time.time()
for d in dataset:
square(d)
print('test with no cores: %s seconds' %(time.time() - start_time))
nCores = 3
print('number of cores used: %s' %(nCores))
start_time = time.time()
p = mp.Pool(nCores) …
Run Code Online (Sandbox Code Playgroud) python parallel-processing multiprocessing parallelism-amdahl pathos
pathos ×10
python ×7
python-3.x ×2
dictionary ×1
linux ×1
python-3.4 ×1
python-3.5 ×1
spawn ×1
tesseract ×1