标签: python-multiprocessing

Python多处理PicklingError:无法pickle <type'function'>

很抱歉,我无法用更简单的示例重现错误,而且我的代码太复杂而无法发布.如果我在IPython shell而不是常规Python中运行程序,那么事情就会很顺利.

我查看了之前关于这个问题的一些注意事项.它们都是由在类函数中定义的pool to call函数引起的.但对我来说情况并非如此.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)

我将不胜感激任何帮助.

更新:我挑选的功能是在模块的顶层定义的.虽然它调用包含嵌套函数的函数.即f()要求g()调用h()具有嵌套函数i(),和我打电话pool.apply_async(f).f(),g(),h()都在顶层定义.我用这个模式尝试了更简单的例子,但它确实有效.

python pickle multiprocessing python-multiprocessing

217
推荐指数
6
解决办法
19万
查看次数

multiprocessing.Pool:map_async和imap有什么区别?

我想学习如何使用Python的multiprocessing包,但我不明白之间的差别map_asyncimap.我注意到,这两个map_asyncimap是异步执行的.那我什么时候应该使用另一个呢?我应该如何检索返回的结果map_async

我应该使用这样的东西吗?

def test():
    result = pool.map_async()
    pool.close()
    pool.join()
    return result.get()

result=test()
for i in result:
    print i
Run Code Online (Sandbox Code Playgroud)

python multiprocessing python-multiprocessing

157
推荐指数
2
解决办法
7万
查看次数

如何恢复传递给multiprocessing.Process的函数的返回值?

在下面的示例代码中,我想恢复函数的返回值worker.我该怎么做呢?这个值存储在哪里?

示例代码:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs
Run Code Online (Sandbox Code Playgroud)

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
Run Code Online (Sandbox Code Playgroud)

我似乎无法在存储的对象中找到相关属性jobs.

提前谢谢,blz

python python-multiprocessing

143
推荐指数
9
解决办法
14万
查看次数

在多个进程之间共享结果队列

multiprocessing模块的文档显示了如何将队列传递给以multiprocessing.Process.开头的进程.但是,如何与异步工作进程共享队列apply_async?我不需要动态加入或其他任何东西,只是工人(反复)将结果报告回基地的一种方式.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))
Run Code Online (Sandbox Code Playgroud)

这失败了: RuntimeError: Queue objects should only be shared between processes through inheritance.我理解这意味着什么,我理解继承的建议,而不是要求pickle/unpickling(以及所有特殊的Windows限制).但如何我传递队列中一个可行的办法?我找不到一个例子,我尝试了几种以各种方式失败的替代品.请帮忙?

python queue parallel-processing multiprocessing python-multiprocessing

77
推荐指数
2
解决办法
4万
查看次数

什么时候应该调用multiprocessing.Pool.join?

我正在使用'multiprocess.Pool.imap_unordered'如下

from multiprocessing import Pool
pool = Pool()
for mapped_result in pool.imap_unordered(mapping_func, args_iter):
    do some additional processing on mapped_result
Run Code Online (Sandbox Code Playgroud)

我需要打电话pool.close还是pool.join在for循环之后?

python python-multiprocessing

63
推荐指数
2
解决办法
5万
查看次数

是否可以在子进程中运行函数而无需线程化或编写单独的文件/脚本.

import subprocess

def my_function(x):
    return x + 100

output = subprocess.Popen(my_function, 1) #I would like to pass the function object and its arguments
print output 
#desired output: 101
Run Code Online (Sandbox Code Playgroud)

我只找到了使用单独脚本打开子进程的文档.有谁知道如何传递函数对象甚至是一种简单的方法来传递函数代码?

python subprocess function popen python-multiprocessing

59
推荐指数
4
解决办法
7万
查看次数

Python多处理模块中ThreadPool与Pool之间的区别是什么

请告诉我之间的差异ThreadPool,并Poolmultiprocessing模块.当我尝试我的代码时,这是我看到的主要区别:

from multiprocessing import Pool
import os, time

print("hi outside of main()")

def hello(x):
    print("inside hello()")
    print("Proccess id: ", os.getpid())
    time.sleep(3)
    return x*x

if __name__ == "__main__":
    p = Pool(5)
    pool_output = p.map(hello, range(3))

    print(pool_output)
Run Code Online (Sandbox Code Playgroud)

我看到以下输出:

hi outside of main()
hi outside of main()
hi outside of main()
hi outside of main()
hi outside of main()
hi outside of main()
inside hello()
Proccess id:  13268
inside hello()
Proccess id:  11104
inside hello()
Proccess id: …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing threadpool python-3.x python-multiprocessing

54
推荐指数
1
解决办法
3万
查看次数

Python多处理:理解`chunksize`背后的逻辑

哪些因素决定了chunksize方法的最佳参数multiprocessing.Pool.map()?该.map()方法似乎使用任意启发式作为其默认的chunksize(如下所述); 是什么推动了这种选择,是否有基于某些特定情况/设置的更周到的方法?

示例 - 说我是:

  • 传递iterable.map()拥有约1500万个元素的元素;
  • 24个核的机器上工作,使用默认processes = os.cpu_count()multiprocessing.Pool().

我天真的想法是给每24个工人一个同样大小的块,即15_000_000 / 24625,000.大块应该在充分利用所有工人的同时减少营业额/管理费用.但似乎缺少给每个工人提供大批量的一些潜在缺点.这是不完整的图片,我错过了什么?


我的部分问题源于if chunksize=None:both .map().starmap()call 的默认逻辑,.map_async()如下所示:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing multiprocessing python-3.x python-multiprocessing

47
推荐指数
3
解决办法
6902
查看次数

多处理:map vs map_async

使用map和有map_async什么区别?将项目从列表分发到4个进程后,它们是否没有运行相同的功能?

那么假设两者都在运行异步和并行是错误的吗?

def f(x):
   return 2*x

p=Pool(4)
l=[1,2,3,4]
out1=p.map(f,l)
#vs
out2=p.map_async(f,l)
Run Code Online (Sandbox Code Playgroud)

python-2.7 python-multiprocessing

45
推荐指数
1
解决办法
4万
查看次数

了解多处理:Python中的共享内存管理,锁和队列

多处理是python中一个强大的工具,我想更深入地理解它.我想知道何时使用常规 队列以及何时使用多处理管理器在所有进程之间共享这些.

我提出了以下测试场景,其中包含四种不同的多处理条件:

  1. 使用池和NO管理器

  2. 使用池和管理器

  3. 使用单个流程和NO Manager

  4. 使用单个进程和Manager

工作

所有条件都执行作业功能the_job.the_job由一些由锁固定的印刷组成.此外,函数的输入只是放入队列(以查看它是否可以从队列中恢复).该输入是一个简单的索引idxrange(10)在称为主脚本创建start_scenario(在底部示出).

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' …
Run Code Online (Sandbox Code Playgroud)

python locking multiprocessing python-multiprocessing

38
推荐指数
1
解决办法
1万
查看次数