我想使用multiprocessing的Pool.map()功能,同时划分出工作.当我使用以下代码时,它工作正常:
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
Run Code Online (Sandbox Code Playgroud)
但是,当我在面向对象的方法中使用它时,它不起作用.它给出的错误信息是:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
Run Code Online (Sandbox Code Playgroud)
当以下是我的主程序时会发生这种情况:
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
Run Code Online (Sandbox Code Playgroud)
以下是我的someClass课程:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
Run Code Online (Sandbox Code Playgroud)
任何人都知道问题可能是什么,或者一个简单的方法呢?
我不知道__setstate__和 __getstate__方法有什么关系,所以请帮我一个简单的例子.
我想multiprocessing在Python中使用该库.遗憾地multiprocessing使用pickle不支持闭包,lambdas或函数的函数__main__.所有这三个对我来说都很重要
In [1]: import pickle
In [2]: pickle.dumps(lambda x: x)
PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>
Run Code Online (Sandbox Code Playgroud)
幸运的是,有dill一个更健壮的泡菜.显然dill在导入时执行魔术以使泡菜工作
In [3]: import dill
In [4]: pickle.dumps(lambda x: x)
Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ...
Run Code Online (Sandbox Code Playgroud)
这非常令人鼓舞,特别是因为我无法访问多处理源代码.可悲的是,我仍然无法得到这个非常基本的例子
import multiprocessing as mp
import dill
p = mp.Pool(4)
print p.map(lambda x: x**2, range(10))
Run Code Online (Sandbox Code Playgroud)
为什么是这样?我错过了什么?究竟是multiprocessing+ dill组合的限制是什么?
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Temporary Edit for J.F Sebastian
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Exception …Run Code Online (Sandbox Code Playgroud) 我正在尝试将多处理与pandas数据帧一起使用,即将数据帧拆分为8个部分.使用apply(每个部分在不同的过程中处理)对每个部分应用一些功能.
编辑:这是我最终找到的解决方案:
import multiprocessing as mp
import pandas.util.testing as pdt
def process_apply(x):
# do some stuff to data here
def process(df):
res = df.apply(process_apply, axis=1)
return res
if __name__ == '__main__':
p = mp.Pool(processes=8)
split_dfs = np.array_split(big_df,8)
pool_results = p.map(aoi_proc, split_dfs)
p.close()
p.join()
# merging parts processed by different processes
parts = pd.concat(pool_results, axis=0)
# merging newly calculated parts to big_df
big_df = pd.concat([big_df, parts], axis=1)
# checking if the dfs were merged correctly
pdt.assert_series_equal(parts['id'], big_df['id'])
Run Code Online (Sandbox Code Playgroud) 我正在OSX上读一个网络摄像头,这个简单的脚本可以正常工作:
import cv2
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms before going to the next frame
except KeyboardInterrupt:
# cleanup the camera and close any open windows
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
Run Code Online (Sandbox Code Playgroud)
我现在想要在一个单独的进程中阅读视频,我的脚本更长,并且在Linux上的单独进程中正确地读取视频:
import numpy as np
import time
import ctypes
import argparse
from multiprocessing …Run Code Online (Sandbox Code Playgroud) 有:
from twisted.internet import reactor
from scrapy.crawler import CrawlerProcess
Run Code Online (Sandbox Code Playgroud)
我总是成功地运行这个过程:
process = CrawlerProcess(get_project_settings())
process.crawl(*args)
# the script will block here until the crawling is finished
process.start()
Run Code Online (Sandbox Code Playgroud)
但是因为我已将此代码移动到web_crawler(self)函数中,如下所示:
def web_crawler(self):
# set up a crawler
process = CrawlerProcess(get_project_settings())
process.crawl(*args)
# the script will block here until the crawling is finished
process.start()
# (...)
return (result1, result2)
Run Code Online (Sandbox Code Playgroud)
并开始使用类实例化调用该方法,如:
def __call__(self):
results1 = test.web_crawler()[1]
results2 = test.web_crawler()[0]
Run Code Online (Sandbox Code Playgroud)
和运行:
test()
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Traceback (most recent call last):
File "test.py", line 573, in <module> …Run Code Online (Sandbox Code Playgroud) 我在基于Intel i3的计算机上运行以下代码,该计算机具有4个虚拟核心(2个超线程/物理核心,64位)和安装的Ubuntu 14.04:
n = multiprocessing.cpu_count()
executor = ThreadPoolExecutor(n)
tuple_mapper = lambda i: (i, func(i))
results = dict(executor.map(tuple_mapper, range(10)))
Run Code Online (Sandbox Code Playgroud)
代码似乎没有以并行方式执行,因为CPU的使用率仅为25%.在利用率图表中,一次仅100%使用4个虚拟核心中的一个.使用的核心每10秒左右交替一次.
但是并行化在具有相同软件设置的服务器计算机上运行良好.我不知道核心的确切数量,也不知道确切的处理器类型,但我确信它有几个核心,利用率为100%,并且计算速度快(使用并行化后速度提高了10倍)一些实验用它).
我希望,并行化也可以在我的机器上运行,而不仅仅是在服务器上.
为什么不起作用?它与我的操作系统设置有关吗?我需要改变它们吗?
提前致谢!
更新: 有关背景信息,请参阅下面的正确答案.为了完整起见,我想提供一个解决问题的示例代码:
tuple_mapper = lambda i: (i, func(i))
n = multiprocessing.cpu_count()
with concurrent.futures.ProcessPoolExecutor(n) as executor:
results = dict(executor.map(tuple_mapper, range(10)))
Run Code Online (Sandbox Code Playgroud)
在重用之前,请注意您正在使用的所有函数都在模块的顶层定义,如下所述: Python多处理酸洗错误
正如您从标题中所知,我正在尝试使用PriorityQueue进行多处理.更确切地说,我想制作共享的PriorityQueue,编写了一些代码并且它没有像我预期的那样运行.
看看代码:
import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue
def worker(queue):
lock = Lock()
with lock:
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
Run Code Online (Sandbox Code Playgroud)
得到以下输出:
worker 100
main 0
Run Code Online (Sandbox Code Playgroud)
发生了什么以及如何以正确的方式做我想做的事情?谢谢.
背景
我想使用带有Inception-Resnet_v2的keras预测病理图像。我已经训练了模型,并得到了一个.hdf5文件。因为病理图像非常大(例如:20,000 x 20,000像素),所以我必须扫描图像以获得小的斑块进行预测。
我想使用带有python2.7的多处理库来加快预测过程。主要思想是使用不同的子流程扫描不同的线,然后发送补丁进行建模。
我看到有人建议在子流程中导入keras和加载模型。但是我认为这不适合我的任务。一次加载模型keras.models.load_model()大约需要47秒,这非常耗时。因此,我无法在每次启动新的子流程时都重新加载模型。
题
我的问题是我可以在主流程中加载模型并将其作为参数传递给子流程吗?
我尝试了两种方法,但它们均无效。
方法1。使用multiprocessing.Pool
代码是:
import keras
from keras.models import load_model
import multiprocessing
def predict(num,model):
print dir(model)
print num
model.predict("image data, type:list")
if __name__ == '__main__':
model = load_model("path of hdf5 file")
list = [(1,model),(2,model),(3,model),(4,model),(5,model),(6,model)]
pool = multiprocessing.Pool(4)
pool.map(predict,list)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
输出是
cPickle.PicklingError: Can't pickle <type 'module'>: attribute lookup __builtin__.module failed
Run Code Online (Sandbox Code Playgroud)
我搜索了该错误,发现Pool无法映射无法选择的参数,因此我尝试了方法2。
方法二。使用multiprocessing.Process
该代码是
import keras
from keras.models import load_model
import multiprocessing
def predict(num,model):
print num
print dir(model)
model.predict("image data, …Run Code Online (Sandbox Code Playgroud) 所以,RQ明确指出我可以在这里排队一个对象的实例方法,所以我一直试图这样做,但得到一个PicklingError:
q.enqueue(some_obj.some_func, some_data)
*** PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)
真的,我只需要在我的方法中访问SQL连接,所以我试着让它成为一个明确接受SQL连接的函数.那也失败了:
q.enqueue(some_func, sql_sess, some_data)
*** PicklingError: Can't pickle <class 'sqlalchemy.orm.session.Session'>: it's not the same object as sqlalchemy.orm.session.Session
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?我做错了什么,或者图书馆坏了吗?