PyR*_*red 6 python parallel-processing ray
我想使用ray对列表的每个元素并行执行函数操作。下面是一个简化的片段
import numpy as np
import time
import ray
import psutil
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus)
@ray.remote
def f(a, b, c):
return a * b - c
def g(a, b, c):
return a * b - c
def my_func_par(large_list):
# arguments a and b are constant just to illustrate
# argument c is is each element of a list large_list
[f.remote(1.5, 2, i) for i in large_list]
def my_func_seq(large_list):
# arguments a anf b are constant just to illustrate
# argument c is is each element of a list large_list
[g(1.5, 2, i) for i in large_list]
my_list = np.arange(1, 10000)
s = time.time()
my_func_par(my_list)
print(time.time() - s)
>>> 2.007
s = time.time()
my_func_seq(my_list)
print(time.time() - s)
>>> 0.0372
Run Code Online (Sandbox Code Playgroud)
问题是,当我计时 时my_func_par,它比 慢得多(如上所示~54x)my_func_seq。ray 的一位作者确实回答了此博客上的评论,该评论似乎解释了我正在做的事情是设置len(large_list)不同的任务,这是不正确的。
如何使用ray并修改上面的代码以并行运行它?(也许通过分割large_list成块,块的数量等于CPU的数量)
编辑:这个问题有两个重要标准
f需要接受多个参数ray.put(large_list)以便larg_list变量可以存储在共享内存中而不是复制到每个处理器小智 10
补充一下桑上面所说的:
Ray Distributed multiprocessing.Pool支持固定大小的 Ray Actor 池,以便于并行化。
import numpy as np
import time
import ray
from ray.util.multiprocessing import Pool
pool = Pool()
def f(x):
# time.sleep(1)
return 1.5 * 2 - x
def my_func_par(large_list):
pool.map(f, large_list)
def my_func_seq(large_list):
[f(i) for i in large_list]
my_list = np.arange(1, 10000)
s = time.time()
my_func_par(my_list)
print('Parallel time: ' + str(time.time() - s))
s = time.time()
my_func_seq(my_list)
print('Sequential time: ' + str(time.time() - s))
Run Code Online (Sandbox Code Playgroud)
使用上面的代码,my_func_par运行速度要快得多(大约0.1秒)。如果您使用代码并f(x)通过类似的方法减慢速度time.sleep,您可以看到多处理的明显优势。
小智 3
并行化版本较慢的原因是运行射线任务不可避免地会产生运行开销(尽管它付出了很多努力来优化它)。这是因为并行运行需要进程间通信、序列化等。
话虽这么说,如果你的函数真的很快(与运行函数一样快)比分布式计算中的其他开销花费的时间更少,你的代码完全就是这种情况,因为函数 f 真的很小。我认为它会花费更少的时间运行该函数需要一微秒)。
这意味着您应该使 f 函数的计算量更大,以便从并行化中受益。您提出的解决方案可能不起作用,因为即使在那之后,函数 f 可能仍然足够轻量,具体取决于您的列表大小。
| 归档时间: |
|
| 查看次数: |
9824 次 |
| 最近记录: |