我想知道python的Multiprocessing.Pool类使用map,imap和map_async的方式.我的特殊问题是我想在一个迭代器上映射,该迭代器创建了占用大量内存的对象,并且不希望所有这些对象同时生成到内存中.我想看看各种map()函数是否会使我的迭代器变干,或者只是在子进程缓慢前进时智能地调用next()函数,所以我这样修改了一些测试:
def g():
for el in xrange(100):
print el
yield el
def f(x):
time.sleep(1)
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
go = g()
g2 = pool.imap(f, go)
g2.next()
Run Code Online (Sandbox Code Playgroud)
依此类推map,imap和map_async.这是最公然的例子,因为简单地在g2上单次调用next()会从我的生成器g()中打印出所有元素,而如果imap这样做'懒惰',我希望它只调用go.next ()一次,因此只打印'1'.
有人可以清理正在发生的事情,并且如果有某种方法让进程池'懒惰'根据需要评估迭代器吗?
谢谢,
加布
我是Python的多处理新手,试图弄清楚我是否应该使用Pool或Process来调用两个函数async.我有两个函数进行curl调用并将信息解析为2个单独的列表.根据互联网连接,每个功能可能需要大约4秒.我意识到瓶颈在于ISP连接,多处理不会加速它,但让它们开始异步会很好.另外,对于我来说,进入python的多处理是一个很好的学习经验,因为我将在以后使用它.
我读过Python multiprocessing.Pool:何时使用apply,apply_async或map?它很有用,但仍然有我自己的问题.
所以我能做到的一种方法是:
def foo():
pass
def bar():
pass
p1 = Process(target=foo, args=())
p2 = Process(target=bar, args=())
p1.start()
p2.start()
p1.join()
p2.join()
Run Code Online (Sandbox Code Playgroud)
我对此实现的问题是:1)由于连接阻塞直到调用进程完成...这是否意味着p1进程必须在p2进程启动之前完成?我总是理解.join()与pool.apply()和pool.apply_sync().get()相同,其中父进程在当前运行完成之前无法启动另一个进程(任务).
另一种选择是:
def foo():
pass
def bar():
pass
pool = Pool(processes=2)
p1 = pool.apply_async(foo)
p1 = pool.apply_async(bar)
Run Code Online (Sandbox Code Playgroud)
我对此实现的问题是:1)我是否需要pool.close(),pool.join()?2)在我得到结果之前,pool.map()会使它们全部完整吗?如果是这样的话,他们还在跑吗?3)pool.apply_async()与使用pool.apply()执行每个进程有何不同4)这与使用Process的先前实现有何不同?
我试图将多处理添加到一些代码中,这些代码具有我无法修改的功能.我想将这些函数作为作业异步提交到多处理池.我正在做的事情很像这里显示的代码.但是,我不确定如何跟踪结果.如何知道返回结果对应的应用函数?
要强调的重点是我无法修改现有函数(其他依赖于它们的东西保持原样),并且结果可以按照与函数作业应用于池的顺序不同的顺序返回.
感谢您的任何想法!
编辑:一些尝试代码如下:
import multiprocessing
from multiprocessing import Pool
import os
import signal
import time
import inspect
def multiply(multiplicand1=0, multiplicand2=0):
return multiplicand1*multiplicand2
def workFunctionTest(**kwargs):
time.sleep(3)
return kwargs
def printHR(object):
"""
This function prints a specified object in a human readable way.
"""
# dictionary
if isinstance(object, dict):
for key, value in sorted(object.items()):
print u'{a1}: {a2}'.format(a1=key, a2=value)
# list or tuple
elif isinstance(object, list) or isinstance(object, tuple):
for element in object:
print element
# other
else: …Run Code Online (Sandbox Code Playgroud) 我正在处理一些ascii-data,进行一些操作,然后将所有内容写回另一个文件(完成的工作post_processing_0.main,不返回任何内容).我想将代码与多处理模块并行化,请参阅以下代码片段:
from multiprocessing import Pool
import post_processing_0
def chunks(lst,n):
return [ lst[i::n] for i in xrange(n) ]
def main():
pool = Pool(processes=proc_num)
P={}
for i in range(0,proc_num):
P['process_'+str(i)]=pool.apply_async(post_processing_0.main, [split_list[i]])
pool.close()
pool.join()
proc_num=8
timesteps=100
list_to_do=range(0,timesteps)
split_list=chunks(list_to_do,proc_num)
main()
Run Code Online (Sandbox Code Playgroud)
我读了地图和异步之间的区别,但我不太了解它.我的多处理模块的应用是否正确?
在这种情况下,我应该使用map_async还是apply_async?为什么?
编辑:
我不认为这是Python multiprocessing.Pool问题的重复:何时使用apply,apply_async或map?.在问题中,答案集中在使用这两个函数可以获得的结果的顺序.我在这里问:没有任何东西返回时有什么区别?
我正在尝试为py3.3探索Python的多处理库,我注意到map_async函数中有一个奇怪的结果,我一直无法解释.我一直期待从回调中存储的结果"乱序".也就是说,如果我将多个任务提供给工作进程,则有些任务应该先于其他任务完成,而不一定按照它们被输入或存在于输入列表中的相同顺序.但是,我得到了一组有序的结果,与输入的任务完全一致.即使在故意试图通过减慢某些过程来"破坏"某些过程(这可能会让其他人在它之前完成)之后就是这种情况.
我在calculate函数中有一个print语句,显示它们是按顺序计算的,但结果仍然是有序的.虽然我不确定我是否可以相信印刷声明是一个很好的指标,事实上实际上是在计算乱序.
测试过程(一般示例):构建一个对象列表,每个对象都包含一个整数.将该对象列表作为参数提交给map_async,以及更新对象的numValue属性的函数"calculate",其值为平方值.然后"calculate"函数返回具有更新值的对象.
一些代码:
import time
import multiprocessing
import random
class NumberHolder():
def __init__(self,numValue):
self.numValue = numValue #Only one attribute
def calculate(obj):
if random.random() >= 0.5:
startTime = time.time()
timeWaster = [random.random() for x in range(5000000)] #Waste time.
endTime = time.time() #Establish end time
print("%d object got stuck in here for %f seconds"%(obj.numValue,endTime-startTime))
#Main Process
if __name__ == '__main__':
numbersToSquare = [x for x in range(0,100)] #I'm
taskList = []
for eachNumber …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Python的multiprocessing.Pool模块优化我的代码,但我没有得到我逻辑上期望的加速结果.
我正在做的主要方法涉及计算大量向量的矩阵向量积和固定的大稀疏矩阵.下面是一个玩具示例,它执行我需要的,但随机矩阵.
import time
import numpy as np
import scipy.sparse as sp
def calculate(vector, matrix = None):
for i in range(50):
v = matrix.dot(vector)
return v
if __name__ == '__main__':
N = 1e6
matrix = sp.rand(N, N, density = 1e-5, format = 'csr')
t = time.time()
res = []
for i in range(10):
res.append(calculate(np.random.rand(N), matrix = matrix))
print time.time() - t
Run Code Online (Sandbox Code Playgroud)
该方法在大约30几秒钟内终止.
现在,由于每个元素的计算results不依赖于任何其他计算的结果,因此很自然地认为并行计算将加速该过程.我们的想法是创建4个流程,如果每个流程都进行一些计算,那么所有流程完成所需的时间应该减少一些因素4.为此,我编写了以下代码:
import time
import numpy as np
import scipy.sparse as …Run Code Online (Sandbox Code Playgroud) 我有以下情况:
for x1 in range(x1, x2):
for x2 in range(x3, x4):
for x3 ...
...
f(x1, x2, x3, ...)
Run Code Online (Sandbox Code Playgroud)
如何将其转换为一种机制,在这种机制中我只告诉python制作n个嵌套循环,其中变量名是x1,x2,x3,x4,...?我当然不想手动编写所有可能性,因为可能有很多维度.
我有一个总和,我正在尝试计算,我很难并行化代码.我试图并行化的计算有点复杂(它使用numpy数组和scipy稀疏矩阵).它吐出一个numpy数组,我想从大约1000个计算中求和输出数组.理想情况下,我会在所有迭代中保持运行总和.但是,我还没弄清楚如何做到这一点.
到目前为止,我已经尝试使用joblib的Parallel函数和pool.map函数与python的多处理包.对于这两个,我使用一个返回numpy数组的内部函数.这些函数返回一个列表,我将其转换为numpy数组,然后求和.
但是,在joblib并行函数完成所有迭代后,主程序永远不会继续运行(看起来原始作业处于挂起状态,使用0%CPU).当我使用pool.map时,在所有迭代完成后我得到内存错误.
有没有办法简单地并行化运行的数组总和?
编辑:目标是做以下的事情,除了并行.
def summers(num_iters):
sumArr = np.zeros((1,512*512)) #initialize sum
for index in range(num_iters):
sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array
return sumArr
Run Code Online (Sandbox Code Playgroud) 简短的问题:是否有可能有N工作流程和一个balancer流程,它会找到此时什么都不做的工人并将其传递UnitOfWork给它?
长问题:想象一下这样的课程,女巫将被子类化以执行某些任务:
class UnitOfWork:
def __init__(self, **some_starting_parameters):
pass
def init(self):
# open connections, etc.
def run(self):
# do the job
Run Code Online (Sandbox Code Playgroud)
启动平衡器和工作进程:
balancer = LoadBalancer()
workers = balancer.spawn_workers(10)
Run Code Online (Sandbox Code Playgroud)
部署工作(平衡器应该找到一个懒惰的工作人员,并将任务传递给它,否则如果每个工作人员都很忙,则将 UOW 添加到队列中并等待空闲工作人员):
balancer.work(UnitOfWork(some=parameters))
# internally, find free worker, pass UOW, ouw.init() + ouw.run()
Run Code Online (Sandbox Code Playgroud)
这可能吗(或者是疯了)?
PS我熟悉multiprocessing Process类和进程池,但是:
Process实例都启动一个进程(是的:))-我想要固定数量的工人Process可以进行通用工作的实例我发现这篇文章解释了如何使用 ctr+c 终止正在运行的多处理代码。以下代码完全正常工作(可以使用 ctrl+c 终止它):
#!/usr/bin/env python
# Copyright (c) 2011 John Reese
# Licensed under the MIT License
import multiprocessing
import os
import signal
import time
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def run_worker():
time.sleep(15)
def main():
print "Initializng 5 workers"
pool = multiprocessing.Pool(5, init_worker)
print "Starting 3 jobs of 15 seconds each"
for i in range(3):
pool.apply_async(run_worker)
try:
print "Waiting 10 seconds"
time.sleep(10)
except KeyboardInterrupt:
print "Caught KeyboardInterrupt, terminating workers"
pool.terminate()
pool.join()
else:
print "Quitting normally"
pool.close()
pool.join() …Run Code Online (Sandbox Code Playgroud) python ×10
asynchronous ×2
performance ×2
dimensions ×1
loops ×1
numpy ×1
python-3.x ×1
queue ×1
scipy ×1
sum ×1