并行化嵌套大 `(15e4 * 15e4)` for 循环以获得成对矩阵

Ama*_*war 1 python parallel-processing multithreading dask

我正在尝试并行化以下代码,它为每一行创建一个成对的结果。如下所示。

def get_custom_value(i, j):
    first = df[df['id'] == i]
    second = df[df['id'] == j]

    return int(first['val_1']) * int(second['val_1']) +\
            int(first['val_2']) * int(second['val_2'])

df = pd.DataFrame(
    {
        'id' : range(4),
        'val_1' : [3, 4, 5, 1],
        'val_2' : [2, 3, 1, 1]
    }
)

n = df.shape[0]

result = []

for i in range(n):
    for j in range(i+1, n):
        temp_value = get_custom_value(i, j)
        result.append([i, j, temp_value])
        if len(result) > 1e5:
            # store it in a local file and reset the result object.
            # Assume here some code to write to a local file here.
            result = []

print(result)
Run Code Online (Sandbox Code Playgroud)

我已经尝试过什么?下面是代码: 代码挂起。没有任何错误。

import itertools
import multiprocessing

paramlist = list(itertools.combinations(df.id, 2))
pool = multiprocessing.Pool(processes = 2)
result  = pool.map(get_custom_value, paramlist)
print(result)
Run Code Online (Sandbox Code Playgroud)

我可以用dask这个吗?

实际数据有超过15万条记录。即最终结果将有 (150,000 * 150,000 * 1/2) 对/行。考虑到结果对象的巨大尺寸,我有一个条件,如果满足,则存储结果。因此,实际result对象不会超过我的 RAM。

Jér*_*ard 5

使用的算法效率非常低。事实上,df[\'id\'] == i和都会迭代现实用例中包含 150_000 个项目的df[\'id\'] == j整个列。id因此,您的算法及时运行O(n^3)并执行大约 3_375_000_000_000_000 次比较,而最佳算法则运行在O(n^2)

\n

此外,CPython 循环非常慢,您应该尽可能避免使用它们。按名称获取 Pandas 数据框单元也非常慢。相反,您可以使用矢量化 Pandas/Numpy 函数

\n

此外,输出也效率不高:CPython 列表有点慢(因为动态引用计数对象)并且存储值(i,j)消耗的内存多出三倍。您可以将结果存储在矩阵中。可能是稀疏数组,也可能是紧凑 Numpy 数组列表

\n

此外,较大的数据结构通常较慢。如果您希望计算速度非常快,通常需要使其适合 CPU 缓存(几 MiB)。因此,要有效地处理数据帧,您当然需要就地计算它

\n

这是使用 Numpy 的相对有效的解决方案:

\n
import numpy as np\nval_1 = np.ascontiguousarray(df[\'val_1\'].to_numpy())\nval_2 = np.ascontiguousarray(df[\'val_2\'].to_numpy())\nresult = val_1.reshape(-1, 1) * val_1 + val_2.reshape(-1, 1) * val_2\n
Run Code Online (Sandbox Code Playgroud)\n

它生成一个n\xc2\xb2矩阵,其中 (i,j) 项可以使用 找到result[i, j]reshape(-1, 1)用于转置水平向量以获得垂直向量,然后从Numpy 广播中受益。请注意,您可以使用 过滤上三角部分np.triu(result, 1)

\n

您可以逐行生成结果,这样就不必分配巨大的数组:

\n
val_1 = np.ascontiguousarray(df[\'val_1\'].to_numpy())\nval_2 = np.ascontiguousarray(df[\'val_2\'].to_numpy())\n\nfor i in range(n-1):\n    first_val_1 = val_1[i]\n    first_val_2 = val_2[i]\n    line = first_val_1 * val_1[i+1:] + first_val_2 * val_2[i+1:]\n\n    # Store the line if needed with the i value so to know where it is\n
Run Code Online (Sandbox Code Playgroud)\n

如果您确实想从 Numpy 数组行生成低效列表,那么您可以使用np.vstack((np.repeat(i, n-i-1), np.arange(i+1, n), line)).T.tolist(). 但我强烈建议你要这样做(当然没有必要使用列表)。np.load请注意,您可以使用和有效地加载/存储 Numpy 数组np.save

\n

以下是我的机器上不同方法在随机 Pandas 数据帧上的性能(配备 i5-9600KF 处理器、2 个达到 40 GiB/s 的 DDR4 通道和一个快速 Nvme SSD,实际上可以以 800 MiB/s 的速度写入大文件) 15_000 条记录:

\n
Initial code:                 60500    seconds  (estimation)\nNumpy matrix:                     0.71 second\nNumpy line-by-line:               0.24 second\n\nTime to store all the lines:      0.50 second   (estimation)\nin a compact way on my SSD\n
Run Code Online (Sandbox Code Playgroud)\n

因此,Numpy 的逐行解决方案比初始代码快大约 250_000 倍!所有这一切都无需使用多核。事实上,在这种情况下,使用多个内核不会快得多,因为 RAM 是有限的共享资源,并且文件存储在大多数机器上并行使用时速度并不会快很多(事实上,HDD 在并行使用时速度较慢,因为它们本质上是顺序的) )。如果你真的想这样做,那么使用多处理绝对不是一个好的工具。请考虑改用Numba 或 Cython

\n