如何使用 python Ray 并行处理一个大列表?

PyR*_*red 6 python parallel-processing ray

我想使用ray对列表的每个元素并行执行函数操作。下面是一个简化的片段

import numpy as np
import time

import ray
import psutil
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus)


@ray.remote
def f(a, b, c):
    return a * b - c


def g(a, b, c):
    return a * b - c


def my_func_par(large_list):
    # arguments a and b are constant just to illustrate
    # argument c is is each element of a list large_list
    [f.remote(1.5, 2, i) for i in large_list]


def my_func_seq(large_list):
    # arguments a anf b are constant just to illustrate
    # argument c is is each element of a list large_list
    [g(1.5, 2, i) for i in large_list]

my_list = np.arange(1, 10000)

s = time.time()
my_func_par(my_list)
print(time.time() - s)
>>> 2.007

s = time.time()
my_func_seq(my_list)
print(time.time() - s)
>>> 0.0372
Run Code Online (Sandbox Code Playgroud)

问题是,当我计时 时my_func_par,它比 慢得多(如上所示~54x)my_func_seq。ray 的一位作者确实回答了此博客上的评论,该评论似乎解释了我正在做的事情是设置len(large_list)不同的任务,这是不正确的。

如何使用ray并修改上面的代码以并行运行它?(也许通过分割large_list成块,块的数量等于CPU的数量)

编辑:这个问题有两个重要标准

  • 该函数f需要接受多个参数
  • 可能需要使用ray.put(large_list)以便larg_list变量可以存储在共享内存中而不是复制到每个处理器

小智 10

补充一下桑上面所说的:

Ray Distributed multiprocessing.Pool支持固定大小的 Ray Actor 池,以便于并行化。

import numpy as np
import time

import ray
from ray.util.multiprocessing import Pool
pool = Pool()

def f(x):
    # time.sleep(1)
    return 1.5 * 2 - x

def my_func_par(large_list):
    pool.map(f, large_list)

def my_func_seq(large_list):
    [f(i) for i in large_list]

my_list = np.arange(1, 10000)

s = time.time()
my_func_par(my_list)
print('Parallel time: ' + str(time.time() - s))

s = time.time()
my_func_seq(my_list)
print('Sequential time: ' + str(time.time() - s))
Run Code Online (Sandbox Code Playgroud)

使用上面的代码,my_func_par运行速度要快得多(大约0.1秒)。如果您使用代码并f(x)通过类似的方法减慢速度time.sleep,您可以看到多处理的明显优势。

  • 如果“f”有多个参数,您将如何更改“pool.map()”? (2认同)

小智 3

并行化版本较慢的原因是运行射线任务不可避免地会产生运行开销(尽管它付出了很多努力来优化它)。这是因为并行运行需要进程间通信、序列化等。

话虽这么说,如果你的函数真的很快(与运行函数一样快)比分布式计算中的其他开销花费的时间更少,你的代码完全就是这种情况,因为函数 f 真的很小。我认为它会花费更少的时间运行该函数需要一微秒)。

这意味着您应该使 f 函数的计算量更大,以便从并行化中受益。您提出的解决方案可能不起作用,因为即使在那之后,函数 f 可能仍然足够轻量,具体取决于您的列表大小。