如何在python中使用多处理

ilo*_*cp3 5 python multiprocessing

我是python的新手,我想在下面的代码中进行并行编程,并希望在python中使用多处理来完成它.那么如何修改代码呢?我一直在使用Pool搜索方法,但发现了我可以遵循的有限示例.有人可以帮帮我吗?谢谢.

请注意,setinner和setouter是两个独立的函数,我想使用并行编程来减少运行时间.

def solve(Q,G,n):
    i = 0
    tol = 10**-4

    while i < 1000:

        inneropt,partition,x = setinner(Q,G,n)
        outeropt = setouter(Q,G,n)

        if (outeropt - inneropt)/(1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G,node1,node2)
        if i == 999:
            print "Maximum iteration reaches"
    print inneropt
Run Code Online (Sandbox Code Playgroud)

aba*_*ert 1

很难并行化需要从不同任务改变相同共享数据的代码。所以,我将假设setinnersetouter是非变异函数;如果事实并非如此,事情就会变得更加复杂。

\n\n

第一步是决定要并行做什么。

\n\n
\n\n

一件显而易见的事情就是同时执行setinner和。setouter它们彼此完全独立,并且总是需要同时完成。所以,这就是我要做的。而不是这样做:

\n\n
inneropt,partition,x = setinner(Q,G,n)\nouteropt = setouter(Q,G,n)\n
Run Code Online (Sandbox Code Playgroud)\n\n

\xe2\x80\xa6 我们想要将两个函数作为任务提交到池中,然后等待两者都完成,然后获取两者的结果。

\n\n

模块concurrent.futures(在 Python 2.x 中需要第三方向后移植)比模块multiprocessing(在 2.6+ 中位于 stdlib 中)更容易执行“等待两者完成”之类的操作,但在这种情况下,我们不需要任何花哨的东西;如果其中一个提前完成,那么在另一个完成之前我们就无事可做。所以,让我们坚持multiprocessing.apply_async

\n\n
pool = multiprocessing.Pool(2) # we never have more than 2 tasks to run\nwhile i < 1000:\n    # parallelly start both tasks\n    inner_result = pool.apply_async(setinner, (Q, G, n))\n    outer_result = pool.apply_async(setouter, (Q, G, n))\n\n    # sequentially wait for both tasks to finish and get their results\n    inneropt,partition,x = inner_result.get()\n    outeropt = outer_result.get()\n\n    # the rest of your loop is unchanged\n
Run Code Online (Sandbox Code Playgroud)\n\n

您可能希望将池移到函数之外,以便它永远存在并可供代码的其他部分使用。如果没有,您几乎肯定希望在函数结束时关闭池。(后来的版本multiprocessing让你只在语句中使用池with,但我认为这需要Python 3.2+,所以你必须明确地这样做。)

\n\n
\n\n

如果您想并行执行更多工作怎么办?好吧,如果不重构循环,这里没有其他明显的事情可做。在从和updateGraph获取结果之前,您无法执行任何操作,并且这里没有其他操作很慢。setinnersetouter

\n\n

但是,如果您可以重新组织事物,以便每个循环都setinner独立于之前的所有内容(在不知道您在做什么的情况下,您的算法\xe2\x80\x94 可能会也可能不可能),我不能猜测),您可以将 2000 个任务预先推送到队列中,然后根据需要循环获取结果。例如:

\n\n
pool = multiprocessing.Pool() # let it default to the number of cores\ninner_results = []\nouter_results = []\nfor _ in range(1000):\n    inner_results.append(pool.apply_async(setinner, (Q,G,n,i))\n    outer_results.append(pool.apply_async(setouter, (Q,G,n,i))\nwhile i < 1000:\n    inneropt,partition,x = inner_results.pop(0).get()\n    outeropt = outer_results.pop(0).get()\n    # result of your loop is the same as before\n
Run Code Online (Sandbox Code Playgroud)\n\n

当然,你可以把它做得更漂亮。

\n\n

例如,假设您很少需要超过几百次迭代,因此总是计算 1000 次是浪费的。您可以在启动时推送第一个 N,然后每次循环时再推送一次(或每 N 次再推送 N 次),这样您就不会进行超过 N 次浪费的迭代\xe2\x80\x94,您无法获得理想权衡在完美的并行性和最小的浪费之间,但你通常可以很好地调整它。

\n\n

另外,如果任务实际上并不需要那么长时间,但您有很多任务,您可能需要将它们分批处理。一种非常简单的方法是使用其中一种map变体而不是apply_async; 这可能会使您的获取代码稍微复杂一些,但它使排队和批处理代码变得完全微不足道(例如,对于100 个参数列表中的map每个参数,a为 10 只需两行简单的代码)。funcchunksize

\n