很抱歉,我无法用更简单的示例重现错误,而且我的代码太复杂而无法发布.如果我在IPython shell而不是常规Python中运行程序,那么事情就会很顺利.
我查看了之前关于这个问题的一些注意事项.它们都是由在类函数中定义的pool to call函数引起的.但对我来说情况并非如此.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)
我将不胜感激任何帮助.
更新:我挑选的功能是在模块的顶层定义的.虽然它调用包含嵌套函数的函数.即f()要求g()调用h()具有嵌套函数i(),和我打电话pool.apply_async(f).f(),g(),h()都在顶层定义.我用这个模式尝试了更简单的例子,但它确实有效.
我想学习如何使用Python的multiprocessing包,但我不明白之间的差别map_async和imap.我注意到,这两个map_async和imap是异步执行的.那我什么时候应该使用另一个呢?我应该如何检索返回的结果map_async?
我应该使用这样的东西吗?
def test():
result = pool.map_async()
pool.close()
pool.join()
return result.get()
result=test()
for i in result:
print i
Run Code Online (Sandbox Code Playgroud) 在下面的示例代码中,我想恢复函数的返回值worker.我该怎么做呢?这个值存储在哪里?
示例代码:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
Run Code Online (Sandbox Code Playgroud)
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
Run Code Online (Sandbox Code Playgroud)
我似乎无法在存储的对象中找到相关属性jobs.
提前谢谢,blz
该multiprocessing模块的文档显示了如何将队列传递给以multiprocessing.Process.开头的进程.但是,如何与异步工作进程共享队列apply_async?我不需要动态加入或其他任何东西,只是工人(反复)将结果报告回基地的一种方式.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Run Code Online (Sandbox Code Playgroud)
这失败了:
RuntimeError: Queue objects should only be shared between processes through inheritance.我理解这意味着什么,我理解继承的建议,而不是要求pickle/unpickling(以及所有特殊的Windows限制).但如何做我传递队列中一个可行的办法?我找不到一个例子,我尝试了几种以各种方式失败的替代品.请帮忙?
python queue parallel-processing multiprocessing python-multiprocessing
我正在使用'multiprocess.Pool.imap_unordered'如下
from multiprocessing import Pool
pool = Pool()
for mapped_result in pool.imap_unordered(mapping_func, args_iter):
do some additional processing on mapped_result
Run Code Online (Sandbox Code Playgroud)
我需要打电话pool.close还是pool.join在for循环之后?
import subprocess
def my_function(x):
return x + 100
output = subprocess.Popen(my_function, 1) #I would like to pass the function object and its arguments
print output
#desired output: 101
Run Code Online (Sandbox Code Playgroud)
我只找到了使用单独脚本打开子进程的文档.有谁知道如何传递函数对象甚至是一种简单的方法来传递函数代码?
请告诉我之间的差异ThreadPool,并Pool在multiprocessing模块.当我尝试我的代码时,这是我看到的主要区别:
from multiprocessing import Pool
import os, time
print("hi outside of main()")
def hello(x):
print("inside hello()")
print("Proccess id: ", os.getpid())
time.sleep(3)
return x*x
if __name__ == "__main__":
p = Pool(5)
pool_output = p.map(hello, range(3))
print(pool_output)
Run Code Online (Sandbox Code Playgroud)
我看到以下输出:
hi outside of main()
hi outside of main()
hi outside of main()
hi outside of main()
hi outside of main()
hi outside of main()
inside hello()
Proccess id: 13268
inside hello()
Proccess id: 11104
inside hello()
Proccess id: …Run Code Online (Sandbox Code Playgroud) python multiprocessing threadpool python-3.x python-multiprocessing
哪些因素决定了chunksize方法的最佳参数multiprocessing.Pool.map()?该.map()方法似乎使用任意启发式作为其默认的chunksize(如下所述); 是什么推动了这种选择,是否有基于某些特定情况/设置的更周到的方法?
示例 - 说我是:
iterable到.map()拥有约1500万个元素的元素;processes = os.cpu_count()内multiprocessing.Pool().我天真的想法是给每24个工人一个同样大小的块,即15_000_000 / 24625,000.大块应该在充分利用所有工人的同时减少营业额/管理费用.但似乎缺少给每个工人提供大批量的一些潜在缺点.这是不完整的图片,我错过了什么?
我的部分问题源于if chunksize=None:both .map()和.starmap()call 的默认逻辑,.map_async()如下所示:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
# ... (materialize `iterable` to list if it's an iterator)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) # ????
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = …Run Code Online (Sandbox Code Playgroud) python parallel-processing multiprocessing python-3.x python-multiprocessing
使用map和有map_async什么区别?将项目从列表分发到4个进程后,它们是否没有运行相同的功能?
那么假设两者都在运行异步和并行是错误的吗?
def f(x):
return 2*x
p=Pool(4)
l=[1,2,3,4]
out1=p.map(f,l)
#vs
out2=p.map_async(f,l)
Run Code Online (Sandbox Code Playgroud) 多处理是python中一个强大的工具,我想更深入地理解它.我想知道何时使用常规 锁和队列以及何时使用多处理管理器在所有进程之间共享这些.
我提出了以下测试场景,其中包含四种不同的多处理条件:
使用池和NO管理器
使用池和管理器
使用单个流程和NO Manager
使用单个进程和Manager
所有条件都执行作业功能the_job.the_job由一些由锁固定的印刷组成.此外,函数的输入只是放入队列(以查看它是否可以从队列中恢复).该输入是一个简单的索引idx从range(10)在称为主脚本创建start_scenario(在底部示出).
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' …Run Code Online (Sandbox Code Playgroud) python ×9
python-3.x ×2
function ×1
locking ×1
pickle ×1
popen ×1
python-2.7 ×1
queue ×1
subprocess ×1
threadpool ×1