在Python多处理库中,是否有pool.map的变体支持多个参数?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud) 我没有看到有关Pool.apply,Pool.apply_async和Pool.map的用例的明确示例.我主要是在用Pool.map
; 别人有什么好处?
很抱歉,我无法用更简单的示例重现错误,而且我的代码太复杂而无法发布.如果我在IPython shell而不是常规Python中运行程序,那么事情就会很顺利.
我查看了之前关于这个问题的一些注意事项.它们都是由在类函数中定义的pool to call函数引起的.但对我来说情况并非如此.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)
我将不胜感激任何帮助.
更新:我挑选的功能是在模块的顶层定义的.虽然它调用包含嵌套函数的函数.即f()
要求g()
调用h()
具有嵌套函数i()
,和我打电话pool.apply_async(f)
.f()
,g()
,h()
都在顶层定义.我用这个模式尝试了更简单的例子,但它确实有效.
现在我在框架中有一个中央模块,它使用Python 2.6 multiprocessing
模块生成多个进程.因为它使用multiprocessing
,所以有模块级多处理感知日志,LOG = multiprocessing.get_logger()
.根据文档,这个记录器具有进程共享锁,因此您不会sys.stderr
通过让多个进程同时写入来解决问题(或任何文件句柄).
我现在遇到的问题是框架中的其他模块不支持多处理.我看到它的方式,我需要使这个中央模块的所有依赖使用多处理感知日志记录.这在框架内很烦人,更不用说框架的所有客户了.我有没有想到的替代方案?
我想使用multiprocessing
的Pool.map()
功能,同时划分出工作.当我使用以下代码时,它工作正常:
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
Run Code Online (Sandbox Code Playgroud)
但是,当我在面向对象的方法中使用它时,它不起作用.它给出的错误信息是:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
Run Code Online (Sandbox Code Playgroud)
当以下是我的主程序时会发生这种情况:
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
Run Code Online (Sandbox Code Playgroud)
以下是我的someClass
课程:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
Run Code Online (Sandbox Code Playgroud)
任何人都知道问题可能是什么,或者一个简单的方法呢?
当我运行类似的东西:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
Run Code Online (Sandbox Code Playgroud)
它工作正常.但是,将此作为类的函数:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Run Code Online (Sandbox Code Playgroud)
给我以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)
我看过Alex Martelli的一篇文章处理同样的问题,但它不够明确.
我想学习如何使用Python的multiprocessing
包,但我不明白之间的差别map_async
和imap
.我注意到,这两个map_async
和imap
是异步执行的.那我什么时候应该使用另一个呢?我应该如何检索返回的结果map_async
?
我应该使用这样的东西吗?
def test():
result = pool.map_async()
pool.close()
pool.join()
return result.get()
result=test()
for i in result:
print i
Run Code Online (Sandbox Code Playgroud) Python的多处理包中队列和管道之间的根本区别是什么?
在什么情况下应该选择一个而不是另一个?什么时候使用有利Pipe()
?什么时候使用有利Queue()
?
如何使用python的多处理池处理KeyboardInterrupt事件?这是一个简单的例子:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results
if __name__ == "__main__":
go()
Run Code Online (Sandbox Code Playgroud)
当运行上面的代码时,KeyboardInterrupt
当我按下时会引发上升^C
,但是该过程只是挂起,我必须在外部杀死它.
我希望能够随时按下^C
并使所有进程正常退出.
multiprocessing ×10
python ×10
pickle ×3
pool ×2
concurrency ×1
logging ×1
performance ×1
pipe ×1
queue ×1