ProcessPoolExecutor中的ThreadPoolExecutor

ins*_*get 11 python multithreading multiprocessing python-3.3 concurrent.futures

我是期货模块的新手,有一项可以从并行化中受益的任务; 但我似乎无法弄清楚如何设置线程的功能和进程的功能.我很感激任何人都可以帮助解决这个问题.

我正在运行粒子群优化(PSO).在没有详细了解PSO本身的情况下,这里是我的代码的基本布局:

有一个Particle类,有一个getFitness(self)方法(计算一些指标并存储它self.fitness).PSO模拟具有多个粒子实例(对于某些模拟,容易超过10; 100s甚至1000s).
每隔一段时间,我就要计算粒子的适应度.目前,我在for循环中执行此操作:

for p in listOfParticles:
  p.getFitness(args)
Run Code Online (Sandbox Code Playgroud)

但是,我注意到每个粒子的适应度可以彼此独立地计算.这使得该适应度计算成为并行化的主要候选者.的确,我能做到map(lambda p: p.getFitness(args), listOfParticles).

现在,我可以轻松地做到这一点futures.ProcessPoolExecutor:

with futures.ProcessPoolExecutor() as e:
  e.map(lambda p: p.getFitness(args), listOfParticles)
Run Code Online (Sandbox Code Playgroud)

由于调用的副作用p.getFitness存储在每个粒子本身,我不必担心从中返回futures.ProcessPoolExecutor().

到现在为止还挺好.但现在我注意到ProcessPoolExecutor创建了新进程,这意味着它会复制内存,这很慢.我希望能够共享内存 - 所以我应该使用线程.这很好,直到我意识到在每个进程中运行多个线程的几个进程可能会更快,因为多个线程仍然只运行在我的甜蜜的8核机器的一个处理器上.

这就是我遇到麻烦的地方:
根据我见过的例子,ThreadPoolExecutor对a进行操作list.那样做ProcessPoolExecutor.所以我不能做任何迭代ProcessPoolExecutor到农场,ThreadPoolExecutor因为那样ThreadPoolExecutor就会得到一个单一的对象(参见我的尝试,发布在下面).
另一方面,我不能自己切片listOfParticles,因为我想做ThreadPoolExecutor自己的魔术来弄清楚需要多少线程.

所以,最重要的问题(最后):
我应该如何构建我的代码,以便我可以使用进程和线程有效地并行化以下内容:

for p in listOfParticles:
  p.getFitness()
Run Code Online (Sandbox Code Playgroud)

这是我一直在尝试的,但我不敢尝试运行它,因为我知道它不会起作用:

>>> def threadize(func, L, mw):
...     with futures.ThreadpoolExecutor(max_workers=mw) as executor:
...             for i in L:
...                     executor.submit(func, i)
... 

>>> def processize(func, L, mw):
...     with futures.ProcessPoolExecutor() as executor:
...             executor.map(lambda i: threadize(func, i, mw), L)
...
Run Code Online (Sandbox Code Playgroud)

我很感激如何解决这个问题,甚至是如何改进我的方法

万一重要,我在python3.3.2上

Tim*_*ers 13

我将为您提供工作代码,将进程与线程混合以解决问题,但这不是您所期望的;-)首先,制作一个不会危及您的真实数据的模拟程序.尝试无害的东西.所以这是开始:

class Particle:
    def __init__(self, i):
        self.i = i
        self.fitness = None
    def getfitness(self):
        self.fitness = 2 * self.i
Run Code Online (Sandbox Code Playgroud)

现在我们可以玩一些东西了.接下来一些常量:

MAX_PROCESSES = 3
MAX_THREADS = 2 # per process
CHUNKSIZE = 100
Run Code Online (Sandbox Code Playgroud)

摆弄那些品尝. CHUNKSIZE将在稍后解释.

你的第一个惊喜就是我最低级别的工人职能.那是因为你在这里过于乐观了:

由于调用p.getFitness的副作用存储在每个粒子本身中,我不必担心从futures.ProcessPoolExecutor()获得返回.

唉,没有在工作进程做可以对任何影响Particle您的主要程序实例.工作进程工作在副本Particle情况下,通过写入时复制的实现是否fork()还是因为它的工作从一取储存制成的副本Particle进程间传递咸菜.

因此,如果您希望主程序查看健身结果,则需要安排将信息发送回主程序.因为我对你的实际程序知之甚少,所以我假设这Particle().i是一个唯一的整数,并且主程序可以很容易地将整数映射回Particle实例.考虑到这一点,这里最低级别的worker函数需要返回一对:唯一整数和适应度结果:

def thread_worker(p):
    p.getfitness()
    return (p.i, p.fitness)
Run Code Online (Sandbox Code Playgroud)

鉴于此,很容易Particle跨越线程传播s 列表,并返回(particle_id, fitness)结果列表:

def proc_worker(ps):
    import concurrent.futures as cf
    with cf.ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
        result = list(e.map(thread_worker, ps))
    return result
Run Code Online (Sandbox Code Playgroud)

笔记:

  1. 这是每个工作进程将运行的功能.
  2. 我正在使用Python 3,因此list()请强制e.map()实现列表中的所有结果.
  3. 正如评论中所提到的,在CPython下,跨线程传播CPU绑定任务比在单个线程中完成所有这些任务要.

只需编写代码来Particle跨进程传播s 列表,并检索结果.这很容易做到multiprocessing,所以这就是我要用的东西.我不知道是否concurrent.futures可以这样做(假设我们也在混合线程),但不关心.但是因为我给你工作代码,你可以玩那个并报告回来;-)

if __name__ == "__main__":
    import multiprocessing

    particles = [Particle(i) for i in range(100000)]
    # Note the code below relies on that particles[i].i == i
    assert all(particles[i].i == i for i in range(len(particles)))

    pool = multiprocessing.Pool(MAX_PROCESSES)
    for result_list in pool.imap_unordered(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
        for i, fitness in result_list:
            particles[i].fitness = fitness

    pool.close()
    pool.join()

    assert all(p.fitness == 2*p.i for p in particles)
Run Code Online (Sandbox Code Playgroud)

笔记:

  1. 我正在Particle"手动" 将s 列表分成块.这CHUNKSIZE是为了什么.这是因为工作进程想要的清单Particles到上工作,并依次是因为那是什么futures map()功能想.无论如何,这都是一个好主意,所以你得到一些真正的好处,以换取每次调用进程间的开销.
  2. imap_unordered()不保证返回结果的顺序.这使实施更加自由,尽可能有效地安排工作.我们不关心这里的订单,所以没关系.
  3. 请注意,循环检索(particle_id, fitness)结果,并相应地修改Particle实例.也许你的真实.getfitnessParticle实例做出其他突变- 无法猜测.无论如何,主程序永远不会看到工人"通过魔法"发生任何突变 - 你必须明确安排.在限制中,您可以(particle_id, particle_instance)改为返回对,并替换Particle主程序中的实例.然后他们会反映工人流程中的所有突变.

玩得开心 :-)

期货一路走低

原来它很容易更换multiprocessing.这是变化.这也(如前所述)取代了原始Particle实例,以便捕获所有突变.不过在这里有一个权衡:腌制一个实例需要"更多"字节而不是腌制一个"健身"结果.更多的网络流量.选择你的毒药;-)

返回变异的实例只需要替换最后一行thread_worker(),如下所示:

return (p.i, p)
Run Code Online (Sandbox Code Playgroud)

然后用这个替换所有的" "块:

def update_fitness():
    import concurrent.futures as cf
    with cf.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as e:
        for result_list in e.map(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
            for i, p in result_list:
                particles[i] = p

if __name__ == "__main__":
    particles = [Particle(i) for i in range(500000)]
    assert all(particles[i].i == i for i in range(len(particles)))

    update_fitness()

    assert all(particles[i].i == i for i in range(len(particles)))
    assert all(p.fitness == 2*p.i for p in particles)
Run Code Online (Sandbox Code Playgroud)

代码与multiprocessor舞蹈非常相似.就个人而言,我会使用该multiprocessing版本,因为imap_unordered它很有价值.这是简化界面的问题:它们通常以隐藏有用可能性为代价来购买简单性.