ins*_*get 11 python multithreading multiprocessing python-3.3 concurrent.futures
我是期货模块的新手,有一项可以从并行化中受益的任务; 但我似乎无法弄清楚如何设置线程的功能和进程的功能.我很感激任何人都可以帮助解决这个问题.
我正在运行粒子群优化(PSO).在没有详细了解PSO本身的情况下,这里是我的代码的基本布局:
有一个Particle类,有一个getFitness(self)方法(计算一些指标并存储它self.fitness).PSO模拟具有多个粒子实例(对于某些模拟,容易超过10; 100s甚至1000s).
每隔一段时间,我就要计算粒子的适应度.目前,我在for循环中执行此操作:
for p in listOfParticles:
p.getFitness(args)
Run Code Online (Sandbox Code Playgroud)
但是,我注意到每个粒子的适应度可以彼此独立地计算.这使得该适应度计算成为并行化的主要候选者.的确,我能做到map(lambda p: p.getFitness(args), listOfParticles).
现在,我可以轻松地做到这一点futures.ProcessPoolExecutor:
with futures.ProcessPoolExecutor() as e:
e.map(lambda p: p.getFitness(args), listOfParticles)
Run Code Online (Sandbox Code Playgroud)
由于调用的副作用p.getFitness存储在每个粒子本身,我不必担心从中返回futures.ProcessPoolExecutor().
到现在为止还挺好.但现在我注意到ProcessPoolExecutor创建了新进程,这意味着它会复制内存,这很慢.我希望能够共享内存 - 所以我应该使用线程.这很好,直到我意识到在每个进程中运行多个线程的几个进程可能会更快,因为多个线程仍然只运行在我的甜蜜的8核机器的一个处理器上.
这就是我遇到麻烦的地方:
根据我见过的例子,ThreadPoolExecutor对a进行操作list.那样做ProcessPoolExecutor.所以我不能做任何迭代ProcessPoolExecutor到农场,ThreadPoolExecutor因为那样ThreadPoolExecutor就会得到一个单一的对象(参见我的尝试,发布在下面).
另一方面,我不能自己切片listOfParticles,因为我想做ThreadPoolExecutor自己的魔术来弄清楚需要多少线程.
所以,最重要的问题(最后):
我应该如何构建我的代码,以便我可以使用进程和线程有效地并行化以下内容:
for p in listOfParticles:
p.getFitness()
Run Code Online (Sandbox Code Playgroud)
这是我一直在尝试的,但我不敢尝试运行它,因为我知道它不会起作用:
>>> def threadize(func, L, mw):
... with futures.ThreadpoolExecutor(max_workers=mw) as executor:
... for i in L:
... executor.submit(func, i)
...
>>> def processize(func, L, mw):
... with futures.ProcessPoolExecutor() as executor:
... executor.map(lambda i: threadize(func, i, mw), L)
...
Run Code Online (Sandbox Code Playgroud)
我很感激如何解决这个问题,甚至是如何改进我的方法
万一重要,我在python3.3.2上
Tim*_*ers 13
我将为您提供工作代码,将进程与线程混合以解决问题,但这不是您所期望的;-)首先,制作一个不会危及您的真实数据的模拟程序.尝试无害的东西.所以这是开始:
class Particle:
def __init__(self, i):
self.i = i
self.fitness = None
def getfitness(self):
self.fitness = 2 * self.i
Run Code Online (Sandbox Code Playgroud)
现在我们可以玩一些东西了.接下来一些常量:
MAX_PROCESSES = 3
MAX_THREADS = 2 # per process
CHUNKSIZE = 100
Run Code Online (Sandbox Code Playgroud)
摆弄那些品尝. CHUNKSIZE将在稍后解释.
你的第一个惊喜就是我最低级别的工人职能.那是因为你在这里过于乐观了:
由于调用p.getFitness的副作用存储在每个粒子本身中,我不必担心从futures.ProcessPoolExecutor()获得返回.
唉,没有在工作进程做可以对任何影响Particle您的主要程序实例.工作进程工作在副本的Particle情况下,通过写入时复制的实现是否fork()还是因为它的工作从一取储存制成的副本Particle进程间传递咸菜.
因此,如果您希望主程序查看健身结果,则需要安排将信息发送回主程序.因为我对你的实际程序知之甚少,所以我假设这Particle().i是一个唯一的整数,并且主程序可以很容易地将整数映射回Particle实例.考虑到这一点,这里最低级别的worker函数需要返回一对:唯一整数和适应度结果:
def thread_worker(p):
p.getfitness()
return (p.i, p.fitness)
Run Code Online (Sandbox Code Playgroud)
鉴于此,很容易Particle跨越线程传播s 列表,并返回(particle_id, fitness)结果列表:
def proc_worker(ps):
import concurrent.futures as cf
with cf.ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
result = list(e.map(thread_worker, ps))
return result
Run Code Online (Sandbox Code Playgroud)
笔记:
list()请强制e.map()实现列表中的所有结果.只需编写代码来Particle跨进程传播s 列表,并检索结果.这很容易做到multiprocessing,所以这就是我要用的东西.我不知道是否concurrent.futures可以这样做(假设我们也在混合线程),但不关心.但是因为我给你工作代码,你可以玩那个并报告回来;-)
if __name__ == "__main__":
import multiprocessing
particles = [Particle(i) for i in range(100000)]
# Note the code below relies on that particles[i].i == i
assert all(particles[i].i == i for i in range(len(particles)))
pool = multiprocessing.Pool(MAX_PROCESSES)
for result_list in pool.imap_unordered(proc_worker,
(particles[i: i+CHUNKSIZE]
for i in range(0, len(particles), CHUNKSIZE))):
for i, fitness in result_list:
particles[i].fitness = fitness
pool.close()
pool.join()
assert all(p.fitness == 2*p.i for p in particles)
Run Code Online (Sandbox Code Playgroud)
笔记:
Particle"手动" 将s 列表分成块.这CHUNKSIZE是为了什么.这是因为工作进程想要的清单的Particles到上工作,并依次是因为那是什么futures map()功能想.无论如何,这都是一个好主意,所以你得到一些真正的好处,以换取每次调用进程间的开销.imap_unordered()不保证返回结果的顺序.这使实施更加自由,尽可能有效地安排工作.我们不关心这里的订单,所以没关系.(particle_id, fitness)结果,并相应地修改Particle实例.也许你的真实.getfitness对Particle实例做出其他突变- 无法猜测.无论如何,主程序永远不会看到工人"通过魔法"发生任何突变 - 你必须明确安排.在限制中,您可以(particle_id, particle_instance)改为返回对,并替换Particle主程序中的实例.然后他们会反映工人流程中的所有突变.玩得开心 :-)
原来它很容易更换multiprocessing.这是变化.这也(如前所述)取代了原始Particle实例,以便捕获所有突变.不过在这里有一个权衡:腌制一个实例需要"更多"字节而不是腌制一个"健身"结果.更多的网络流量.选择你的毒药;-)
返回变异的实例只需要替换最后一行thread_worker(),如下所示:
return (p.i, p)
Run Code Online (Sandbox Code Playgroud)
然后用这个替换所有的" 主 "块:
def update_fitness():
import concurrent.futures as cf
with cf.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as e:
for result_list in e.map(proc_worker,
(particles[i: i+CHUNKSIZE]
for i in range(0, len(particles), CHUNKSIZE))):
for i, p in result_list:
particles[i] = p
if __name__ == "__main__":
particles = [Particle(i) for i in range(500000)]
assert all(particles[i].i == i for i in range(len(particles)))
update_fitness()
assert all(particles[i].i == i for i in range(len(particles)))
assert all(p.fitness == 2*p.i for p in particles)
Run Code Online (Sandbox Code Playgroud)
代码与multiprocessor舞蹈非常相似.就个人而言,我会使用该multiprocessing版本,因为imap_unordered它很有价值.这是简化界面的问题:它们通常以隐藏有用可能性为代价来购买简单性.
| 归档时间: |
|
| 查看次数: |
4112 次 |
| 最近记录: |