相关疑难解决方法(0)

如何恢复传递给multiprocessing.Process的函数的返回值?

在下面的示例代码中,我想恢复函数的返回值worker.我该怎么做呢?这个值存储在哪里?

示例代码:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs
Run Code Online (Sandbox Code Playgroud)

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
Run Code Online (Sandbox Code Playgroud)

我似乎无法在存储的对象中找到相关属性jobs.

提前谢谢,blz

python python-multiprocessing

143
推荐指数
9
解决办法
14万
查看次数

Python多处理:理解`chunksize`背后的逻辑

哪些因素决定了chunksize方法的最佳参数multiprocessing.Pool.map()?该.map()方法似乎使用任意启发式作为其默认的chunksize(如下所述); 是什么推动了这种选择,是否有基于某些特定情况/设置的更周到的方法?

示例 - 说我是:

  • 传递iterable.map()拥有约1500万个元素的元素;
  • 24个核的机器上工作,使用默认processes = os.cpu_count()multiprocessing.Pool().

我天真的想法是给每24个工人一个同样大小的块,即15_000_000 / 24625,000.大块应该在充分利用所有工人的同时减少营业额/管理费用.但似乎缺少给每个工人提供大批量的一些潜在缺点.这是不完整的图片,我错过了什么?


我的部分问题源于if chunksize=None:both .map().starmap()call 的默认逻辑,.map_async()如下所示:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing multiprocessing python-3.x python-multiprocessing

47
推荐指数
3
解决办法
6902
查看次数

Pool.apply_async():嵌套函数不执行

我开始熟悉 Python 的multiprocessing模块。以下代码按预期工作:

#outputs 0 1 2 3
from multiprocessing import Pool
def run_one(x):
    print x
    return

pool = Pool(processes=12)
for i in range(4):
    pool.apply_async(run_one, (i,))
pool.close()
pool.join() 
Run Code Online (Sandbox Code Playgroud)

但是,现在,如果我围绕上述代码包装一个函数,print则不会执行语句(或至少重定向输出):

#outputs nothing
def run():
    def run_one(x):
        print x
        return    

    pool = Pool(processes=12)
    for i in range(4):    
        pool.apply_async(run_one, (i,))
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

如果我将run_one定义移到之外run,则输出再次是预期的,当我调用时run()

#outputs 0 1 2 3
def run_one(x):
    print x
    return

def run():    
    pool = Pool(processes=12)
    for i in range(4):       
        pool.apply_async(run_one, …
Run Code Online (Sandbox Code Playgroud)

python asynchronous multiprocessing python-multiprocessing

4
推荐指数
1
解决办法
1944
查看次数

多处理池映射:AttributeError:无法腌制本地对象

我有一个与Python Multiprocessing Pool Map非常相似的问题 :AttributeError: Can't pickle local object

我想我明白问题出在哪里,我只是不知道如何解决它。“Pool.map”需要一个顶级函数作为输入。但我不知道如何重写这个问题:

一个简化的代码版本:

import os as os
from multiprocessing import Pool
import numpy as np
def opti_fun_data(prediction):
    def opti_fun(x):
        def error_fun(i):
            return error_fun_opti(x,prediction,i)
        try:
            pool = Pool(np.max([os.cpu_count()-1,1]))
            error = np.mean(pool.map(error_fun, range(M)))
        finally: # To make sure processes are closed in the end, even if errors happen
            pool.close()
            pool.join()
        return error
    return opti_fun

Run Code Online (Sandbox Code Playgroud)

如果我跑 opti_fun_data(prediction)(x0) 我得到

Can't pickle local object 'opti_fun_data.<locals>.opti_fun.<locals>.error_fun'

我是多处理库的新手,可以伸出援助之手。对于那些对一些背景感兴趣的人:我想为一堆不同的场景/预测最小化函数“opti_fun”。计算我的错误度量/基准(“error_fun_opti”)是安静的计算密集型,因此我尝试并行化这一步。

python mathematical-optimization multiprocessing python-multithreading python-multiprocessing

4
推荐指数
1
解决办法
4984
查看次数

multiprocessing pool example does not work and freeze the kernel

I'm trying to parallelize a script, but for an unknown reason the kernel just freeze without any errors thrown.

minimal working example:

from multiprocessing import Pool

def f(x):
  return x*x

p = Pool(6)
print(p.map(f, range(10)))   
Run Code Online (Sandbox Code Playgroud)

Interestingly, all works fine if I define my function in another file then import it. How can I make it work without the need of another file?

I work with spyder (anaconda) and I have the same result if I run my code from the …

python windows parallel-processing multiprocessing python-multiprocessing

3
推荐指数
1
解决办法
2904
查看次数

python multiprocessing:AttributeError:无法腌制本地对象

我在一个类中有一个方法来返回一个参数可能会改变的函数。

Interface函数接受两个参数,f和它的args。我想用mp.pool来加速它。但是,它返回一个错误。

from multiprocessing import Pool
# from multiprocess import Pool
# from pathos.multiprocessing import ProcessingPool as Pool
import pickle
import dill


class Temp:
    def __init__(self, a):
        self.a = a

    def test(self):
        def test1(x):
            return self.a + x

        return test1


def InterfaceFunc(f, x):
    mypool = Pool(4)
    return list(mypool.map(f, x))


if __name__ == "__main__":
    t1 = Temp(1).test()
    x = [1, 2, 3, 1, 2]

    res1 = list(map(t1, x))
    print(res1)

    res2 = InterfaceFunc(t1, x)
Run Code Online (Sandbox Code Playgroud)

它引发了同样的错误:

AttributeError: Can't pickle local object 'Temp.test.<locals>.test1'
Run Code Online (Sandbox Code Playgroud)

我尝试了 …

python

3
推荐指数
1
解决办法
5187
查看次数