相关疑难解决方法(0)

如何在 Python 多处理中使用 boto3 客户端?

代码如下所示:

import multiprocessing as mp
from functools import partial

import boto3
import numpy as np


s3 = boto3.client('s3')

def _something(**kwargs):
    # Some mixed integer programming stuff related to the variable archive
    return np.array(some_variable_related_to_archive)


def do(s3):
    archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant
    pool = mp.pool()
    sub_process = partial(_something, slack=0.1)
    parts = np.array_split(archive, some_int)
    target_parts = np.array(things)

    out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line

    pool.close()
    pool.join()

do(s3)
Run Code Online (Sandbox Code Playgroud)

错误:

_pickle.PicklingError: Can't pickle …
Run Code Online (Sandbox Code Playgroud)

python python-multiprocessing boto3 starmap

10
推荐指数
1
解决办法
1万
查看次数

使用copy_reg处理classmethod酸洗问题

在处理多处理时遇到了酸洗错误:

 from multiprocessing import Pool

 def test_func(x):
     return x**2

 class Test:
     @classmethod
     def func(cls, x):
         return x**2

 def mp_run(n, func, args):
     return Pool(n).map(func, args)

 if __name__ == '__main__':
     args = range(1,6)

     print mp_run(5, test_func, args)
     # [1, 4, 9, 16, 25]

     print mp_run(5, Test.func, args)
     """
     Exception in thread Thread-3:
     Traceback (most recent call last):
       File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
         self.run()
       File "/usr/lib64/python2.6/threading.py", line 484, in run
         self.__target(*self.__args, **self.__kwargs)
       File "/usr/lib64/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
         put(task)
     PicklingError: Can't pickle …
Run Code Online (Sandbox Code Playgroud)

python pickle multiprocessing

9
推荐指数
1
解决办法
3757
查看次数

如何避免这种酸洗错误,以及在Python中并行化此代码的最佳方法是什么?

我有以下代码.

def main():
  (minI, maxI, iStep, minJ, maxJ, jStep, a, b, numProcessors) = sys.argv
  for i in range(minI, maxI, iStep):
    for j in range(minJ, maxJ, jStep): 
      p = multiprocessing.Process(target=functionA, args=(minI, minJ))
      p.start()
      def functionB((a, b)):
        subprocess.call('program1 %s %s %s %s %s %s' %(c, a, b, 'file1', 
          'file2', 'file3'), shell=True)
        for d in ['a', 'b', 'c']:
          subprocess.call('program2 %s %s %s %s %s' %(d, 'file4', 'file5', 
            'file6', 'file7'), shell=True)
      abProduct = list(itertools.product(range(0, 10), range(0, 10)))
      pool = multiprocessing.Pool(processes=numProcessors)
      pool.map(functionB, abProduct) 
Run Code Online (Sandbox Code Playgroud)

它会产生以下错误.

Exception …
Run Code Online (Sandbox Code Playgroud)

python pickle multiprocessing

8
推荐指数
1
解决办法
2万
查看次数

不能泡菜功能

所以我试图通过做一点多处理来加快我的计算时间

我正在尝试使用池工人.

在我的代码的顶部,我有

import Singal as s
import multiprocessing as mp
def wrapper(Channel):
    Noise_Frequincies = []
    for i in range(1,125):
        Noise_Frequincies.append(60.0*float(i))
    Noise_Frequincies.append(180.0)
    filter1 = s.Noise_Reduction(Sample_Rate,Noise_Frequincies,Channel)
    return filter1
Run Code Online (Sandbox Code Playgroud)

然后到时候我用

Both_Channels = [Chan1, Chan2]
results = mp.Pool(2).map(wrapper,Both_Channels)
filter1 = results[0]
filter2 = results[1]
Run Code Online (Sandbox Code Playgroud)

我收到以下错误

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup …
Run Code Online (Sandbox Code Playgroud)

python pool multiprocessing

8
推荐指数
1
解决办法
2万
查看次数

python多处理的替代使用模式避免了全局状态的扩散?

这个(非常简化的例子)工作正常(Python 2.6.6,Debian Squeeze):

from multiprocessing import Pool
import numpy as np

src=None

def process(row):
    return np.sum(src[row])

def main():
    global src
    src=np.ones((100,100))

    pool=Pool(processes=16)
    rows=pool.map(process,range(100))
    print rows

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

然而,经过多年的教育,全球状态不好!!! ,我所有的直觉告诉我,我真的宁愿写更接近的东西:

from multiprocessing import Pool
import numpy as np

def main():
    src=np.ones((100,100))

    def process(row):
        return np.sum(src[row])

    pool=Pool(processes=16)
    rows=pool.map(process,range(100))
    print rows

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

但当然这不起作用(挂起无法腌制的东西).

这里的例子是微不足道的,但是当你添加多个"进程"函数时,每个函数都依赖于多个额外的输入......这一切都让人想起30年前用BASIC编写的东西.尝试使用类来至少使用适当的函数聚合状态似乎是一个明显的解决方案,但在实践中似乎并不那么容易.

是否有一些推荐的模式或样式使用multiprocessing.pool,这将避免全局状态的扩散,以支持我想并行映射的每个函数?

经验丰富的"多处理专业人员"如何处理这个问题?

更新:请注意,我实际上对处理更大的数组感兴趣,因此上面的src每个调用/迭代的变化都不如将其分配到池的工作进程中.

python pool multiprocessing

7
推荐指数
1
解决办法
1989
查看次数

Spark返回Pickle错误:无法查找属性

尝试在我的RDD中启动一个类时,我正在运行一些属性查找问题.

我的工作流程

1-从RDD开始

2-获取RDD的每个元素,为每个元素启动一个对象

3- Reduce(我将编写一个稍后将定义reduce操作的方法)

这是#2:

>class test(object):
def __init__(self, a,b):
    self.total = a + b

>a = sc.parallelize([(True,False),(False,False)])
>a.map(lambda (x,y): test(x,y))
Run Code Online (Sandbox Code Playgroud)

这是我得到的错误:

PicklingError:不能咸菜<类的.测试'>:属性查找.测试失败

我想知道它是否有任何解决方法.请回答一个工作示例来实现预期的结果(即创建类"测试"对象的RDD).

相关问题:

python pickle apache-spark

7
推荐指数
1
解决办法
5216
查看次数

Python多处理池 - 迭代对象方法?

也许更熟悉Python的多处理池代码的人可以帮助我.我试图通过套接字连接同时连接到我的网络上的几个主机(任何时候N)并执行一些RPC.当一个主机完成时,我想将下一个主机添加到池中以运行直到所有主机都完成.

我有一个类,HClass,有一些方法可以这样做,还有一个主机名列表中包含的主机名.但我没有找到任何一个docs.python.org的例子来使这个工作.

一小段代码来说明我到目前为止所获得的内容:

hostlist = [h1, h2, h3, h4, ....]
poolsize = 2

class HClass:
  def __init__(self, hostname="default"):
    self.hostname = hostname

  def go(self):
      # do stuff
      # do more stuff
  ....

if __name__ == "__main__":
  objs = [HClass(hostname=current_host) for current_host in hostlist]
  pool = multiprocessing.pool(poolsize)
  results = pool.apply_async(objs.go())
Run Code Online (Sandbox Code Playgroud)

到目前为止,我很幸运有这个追溯:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task) …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing

6
推荐指数
1
解决办法
6632
查看次数

这是pickle实例方法的正确方法吗?如果是的话,为什么不在Python 3中呢?

实例方法不能在Python 2或Python 3中自动腌制.

我需要使用Python 3 挑选实例方法,并将Steven Bethard的示例代码移植到Python 3:

import copyreg
import types

def _pickle_method(method):
    func_name = method.__func__.__name__
    obj = method.__self__
    cls = method.__self__.__class__
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method)
Run Code Online (Sandbox Code Playgroud)

这种方法是否适用于酸洗实例方法?或者有些事情可怕的错误?我用一些模拟类测试了它,一切似乎都有效.

如果什么都不会出错,为什么Python 3中的标准pickle实例方法不可能?

pickle instance-methods python-3.x

5
推荐指数
1
解决办法
1313
查看次数

带有RPYC的多处理Python“ ValueError:禁用酸洗”

我试图在rpyc服务中使用多处理程序包,但是ValueError: pickling is disabled当我尝试从客户端调用公开函数时会得到提示。我知道该multiprocesing程序包使用酸洗在进程之间传递信息,并且不允许酸洗,rpyc因为这是不安全的协议。因此,我不确定将多处理与rpyc一起使用的最佳方法(或者是否存在)。如何在rpyc服务中使用多重处理?这是服务器端代码:

import rpyc
from multiprocessing import Pool

class MyService(rpyc.Service):

    def exposed_RemotePool(self, function, arglist):

        pool = Pool(processes = 8)
        result = pool.map(function, arglist)
        pool.close()
        return result


if __name__ == "__main__":
    from rpyc.utils.server import ThreadedServer
    t = ThreadedServer(MyService, port = 18861)
    t.start()
Run Code Online (Sandbox Code Playgroud)

这是产生错误的客户端代码:

import rpyc

def square(x):
    return x*x

c = rpyc.connect("localhost", 18861)
result = c.root.exposed_RemotePool(square, [1,2,3,4])
print(result)
Run Code Online (Sandbox Code Playgroud)

python multiprocessing rpyc python-multiprocessing

5
推荐指数
1
解决办法
1314
查看次数

Python 类方法并行化

我需要提升我的 python 应用程序。解决方案应该是微不足道的:

import time
from multiprocessing import Pool


class A:
    def method1(self):
        time.sleep(1)
        print('method1')
        return 'method1'

    def method2(self):
        time.sleep(1)
        print('method2')
        return 'method2'

    def method3(self):
        pool = Pool()
        time1 = time.time()
        res1 = pool.apply_async(self.method1, [])
        res2 = pool.apply_async(self.method2, [])
        res1 = res1.get()
        res2 = res2.get()
        time2 = time.time()
        print('res1 = {0}'.format(res1))
        print('res2 = {0}'.format(res2))
        print('time = {0}'.format(time2 - time1))



a = A()
a.method3()
Run Code Online (Sandbox Code Playgroud)

但是每次我启动这个简单的程序时,我都会遇到一个异常:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python3.2/threading.py", line 740, in _bootstrap_inner
    self.run() …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing

5
推荐指数
0
解决办法
1863
查看次数

PicklingError:无法使用python进程池执行程序对&lt;type'function'&gt;进行酸洗

实用程序

def exec_multiprocessing(self, method, args):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = pool.map(method, args)
        return results
Run Code Online (Sandbox Code Playgroud)

clone.py

def clone_vm(self, name, first_run, host, ip):
    # clone stuff
Run Code Online (Sandbox Code Playgroud)

invoke.py

exec_args = [(name, first_run, host, ip) for host, ip in zip(hosts, ips)]
results = self.util.exec_multiprocessing(self.clone.clone_vm, exec_args)
Run Code Online (Sandbox Code Playgroud)

上面的代码给出了酸洗错误。我发现这是因为我们正在传递实例方法。因此,我们应该解开实例方法。但是我无法使其工作。

注意:我无法创建顶级方法来避免这种情况。我必须使用实例方法。

python python-multiprocessing

5
推荐指数
1
解决办法
4220
查看次数

不能腌制本地物体

我想在此代码中使用多进程包.我试图调用该函数create_new_population并将数据分发到8个处理器,但是当我这样做时,我得到了pickle错误.

通常函数会像这样运行: self.create_new_population(self.pop_size)

我尝试像这样分发工作:

f= self.create_new_population
pop = self.pop_size/8
self.current_generation = [pool.apply_async(f, pop) for _ in range(8)]
Run Code Online (Sandbox Code Playgroud)

我得到 或Can't pickle local object 'exhaust.__init__.<locals>.tour_select'
PermissionError: [WinError 5] Access is denied

我仔细阅读了这个帖子,并尝试使用Steven Bethard的方法绕过错误,允许通过copyreg进行方法酸洗/ 取消:

def _pickle_method(method)
def _unpickle_method(func_name, obj, cls)
Run Code Online (Sandbox Code Playgroud)

我还尝试使用pathos包没有任何运气.
我知道应该在
if __name__ == '__main__':块下调用代码,但我想知道是否可以在代码中尽可能少的更改来完成.

python parallel-processing multithreading python-multiprocessing pathos

5
推荐指数
0
解决办法
2353
查看次数