如何在Python中进行并行编程

ilo*_*cp3 129 python parallel-processing

对于C++,我们可以使用OpenMP进行并行编程; 但是,OpenMP不适用于Python.如果我想并行我的python程序的某些部分,我该怎么办?

代码的结构可以被认为是:

 solve1(A)
 solve2(B)
Run Code Online (Sandbox Code Playgroud)

哪里solve1solve2是两个独立的功能.如何并行运行这种代码而不是按顺序运行以减少运行时间?希望可以有人帮帮我.首先十分感谢.代码是:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt
Run Code Online (Sandbox Code Playgroud)

setinner和setouter是两个独立的函数.这就是我要平行的地方......

Mat*_*son 150

您可以使用多处理模块.对于这种情况,我可能会使用处理池:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
Run Code Online (Sandbox Code Playgroud)

这将生成可以为您执行通用工作的进程.由于我们没有通过processes,它将为您机器上的每个CPU核心生成一个进程.每个CPU核心可以同时执行一个进程.

如果要将列表映射到单个函数,则可以执行以下操作:

args = [A, B]
results = pool.map(solve1, args)
Run Code Online (Sandbox Code Playgroud)

不要使用线程,因为GIL会锁定python对象上的任何操作.

  • 这里的超时时间是几点? (2认同)

Rob*_*ara 20

Ray可以非常优雅地完成这项工作.

要并行化您的示例,您需要使用@ray.remote装饰器定义您的函数,然后使用它来调用它们.remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])
Run Code Online (Sandbox Code Playgroud)

多处理模块相比,这有许多优点.

  1. 相同的代码将在多核计算机和一组计算机上运行.
  2. 进程通过共享内存和零拷贝序列化有效地共享数据.
  3. 错误消息传播得很好.
  4. 这些函数调用可以组合在一起,例如,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
    Run Code Online (Sandbox Code Playgroud)
  5. 除了远程调用函数之外,还可以将类作为actor远程实例化.

请注意,Ray是我一直在帮助开发的框架.

  • 通常这种错误意味着你需要升级`pip`.我建议尝试`pip install --upgrade pip`.如果你需要使用`sudo`,那么你用来安装`ray`的`pip`版本可能与升级版本不同.你可以查看`pip --version`.此外,Windows目前不受支持,所以如果你在Windows上可能是问题. (2认同)
  • 请注意,这主要用于在多台机器上分配并发作业。 (2认同)
  • 它实际上针对单机案例和群集设置进行了优化.许多设计决策(例如,共享内存,零拷贝序列化)旨在很好地支持单个机器. (2认同)
  • 如果文档能更多地指出这一点,那就太好了。通过阅读文档,我感觉到它并不是真正针对单机案例。 (2认同)

bau*_*ace 6

CPython 使用全局解释器锁,这使得并行编程比 C++ 更有趣

本主题有几个有用的示例和挑战描述:

在 Linux 上使用任务集的多核系统上的 Python 全局解释器锁 (GIL) 解决方法?

  • 您称无法_真正_同时运行代码“有趣”吗?:-/ (16认同)

小智 6

正如其他人所说,解决方案是使用多个进程。然而,哪种框架更合适取决于许多因素。除了已经提到的,还有charm4pympi4py(我是charm4py的开发者)。

有一种比使用工作池抽象更有效的方法来实现上述示例。G在 1000 次迭代中,主循环一遍又一遍地向工作人员发送相同的参数(包括完整图)。由于至少有一个 worker 将驻留在不同的进程中,因此这涉及复制参数并将其发送到其他进程。根据对象的大小,这可能会非常昂贵。相反,让工作人员存储状态并简单地发送更新的信息是有意义的。

例如,在charm4py 中,这可以像这样完成:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...
Run Code Online (Sandbox Code Playgroud)

请注意,对于此示例,我们实际上只需要一名工人。主循环可以执行其中一个函数,并让 worker 执行另一个。但我的代码有助于说明以下几点:

  1. 工人 A 在进程 0 中运行(与主循环相同)。当result_a.get()被阻塞等待结果时,工人 A 在同一个进程中进行计算。
  2. 参数通过引用自动传递给工人 A,因为它在同一个进程中(不涉及复制)。


vah*_*ari 5

您可以使用joblib库进行并行计算和多重处理。

from joblib import Parallel, delayed
Run Code Online (Sandbox Code Playgroud)

foo您可以简单地创建一个想要并行运行的函数,并基于以下代码实现并行处理:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)
Run Code Online (Sandbox Code Playgroud)

num_cores可以从图书馆获取如下multiprocessing

import multiprocessing

num_cores = multiprocessing.cpu_count()
Run Code Online (Sandbox Code Playgroud)

如果您有一个具有多个输入参数的函数,并且您只想通过列表迭代其中一个参数,则可以使用库partial中的函数,functools如下所示:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)
Run Code Online (Sandbox Code Playgroud)

您可以在此处找到有关 python 和 R 多重处理的完整说明以及几个示例。