ilo*_*cp3 129 python parallel-processing
对于C++,我们可以使用OpenMP进行并行编程; 但是,OpenMP不适用于Python.如果我想并行我的python程序的某些部分,我该怎么办?
代码的结构可以被认为是:
solve1(A)
solve2(B)
Run Code Online (Sandbox Code Playgroud)
哪里solve1和solve2是两个独立的功能.如何并行运行这种代码而不是按顺序运行以减少运行时间?希望可以有人帮帮我.首先十分感谢.代码是:
def solve(Q, G, n):
i = 0
tol = 10 ** -4
while i < 1000:
inneropt, partition, x = setinner(Q, G, n)
outeropt = setouter(Q, G, n)
if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
break
node1 = partition[0]
node2 = partition[1]
G = updateGraph(G, node1, node2)
if i == 999:
print "Maximum iteration reaches"
print inneropt
Run Code Online (Sandbox Code Playgroud)
setinner和setouter是两个独立的函数.这就是我要平行的地方......
Mat*_*son 150
您可以使用多处理模块.对于这种情况,我可能会使用处理池:
from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
Run Code Online (Sandbox Code Playgroud)
这将生成可以为您执行通用工作的进程.由于我们没有通过processes,它将为您机器上的每个CPU核心生成一个进程.每个CPU核心可以同时执行一个进程.
如果要将列表映射到单个函数,则可以执行以下操作:
args = [A, B]
results = pool.map(solve1, args)
Run Code Online (Sandbox Code Playgroud)
不要使用线程,因为GIL会锁定python对象上的任何操作.
Rob*_*ara 20
Ray可以非常优雅地完成这项工作.
要并行化您的示例,您需要使用@ray.remote装饰器定义您的函数,然后使用它来调用它们.remote.
import ray
ray.init()
# Define the functions.
@ray.remote
def solve1(a):
return 1
@ray.remote
def solve2(b):
return 2
# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)
# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])
Run Code Online (Sandbox Code Playgroud)
与多处理模块相比,这有许多优点.
这些函数调用可以组合在一起,例如,
@ray.remote
def f(x):
return x + 1
x_id = f.remote(1)
y_id = f.remote(x_id)
z_id = f.remote(y_id)
ray.get(z_id) # returns 4
Run Code Online (Sandbox Code Playgroud)请注意,Ray是我一直在帮助开发的框架.
CPython 使用全局解释器锁,这使得并行编程比 C++ 更有趣
本主题有几个有用的示例和挑战描述:
在 Linux 上使用任务集的多核系统上的 Python 全局解释器锁 (GIL) 解决方法?
小智 6
正如其他人所说,解决方案是使用多个进程。然而,哪种框架更合适取决于许多因素。除了已经提到的,还有charm4py和mpi4py(我是charm4py的开发者)。
有一种比使用工作池抽象更有效的方法来实现上述示例。G在 1000 次迭代中,主循环一遍又一遍地向工作人员发送相同的参数(包括完整图)。由于至少有一个 worker 将驻留在不同的进程中,因此这涉及复制参数并将其发送到其他进程。根据对象的大小,这可能会非常昂贵。相反,让工作人员存储状态并简单地发送更新的信息是有意义的。
例如,在charm4py 中,这可以像这样完成:
class Worker(Chare):
def __init__(self, Q, G, n):
self.G = G
...
def setinner(self, node1, node2):
self.updateGraph(node1, node2)
...
def solve(Q, G, n):
# create 2 workers, each on a different process, passing the initial state
worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
while i < 1000:
result_a = worker_a.setinner(node1, node2, ret=True) # execute setinner on worker A
result_b = worker_b.setouter(node1, node2, ret=True) # execute setouter on worker B
inneropt, partition, x = result_a.get() # wait for result from worker A
outeropt = result_b.get() # wait for result from worker B
...
Run Code Online (Sandbox Code Playgroud)
请注意,对于此示例,我们实际上只需要一名工人。主循环可以执行其中一个函数,并让 worker 执行另一个。但我的代码有助于说明以下几点:
result_a.get()被阻塞等待结果时,工人 A 在同一个进程中进行计算。您可以使用joblib库进行并行计算和多重处理。
from joblib import Parallel, delayed
Run Code Online (Sandbox Code Playgroud)
foo您可以简单地创建一个想要并行运行的函数,并基于以下代码实现并行处理:
output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)
Run Code Online (Sandbox Code Playgroud)
num_cores可以从图书馆获取如下multiprocessing:
import multiprocessing
num_cores = multiprocessing.cpu_count()
Run Code Online (Sandbox Code Playgroud)
如果您有一个具有多个输入参数的函数,并且您只想通过列表迭代其中一个参数,则可以使用库partial中的函数,functools如下所示:
from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
'''
body of the function
'''
return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)
Run Code Online (Sandbox Code Playgroud)
您可以在此处找到有关 python 和 R 多重处理的完整说明以及几个示例。
| 归档时间: |
|
| 查看次数: |
204921 次 |
| 最近记录: |