稀疏CSR阵列的核外处理

rth*_*rth 39 python scipy joblib dask apache-spark-mllib

如何在使用Python保存在磁盘上的稀疏CSR数组的块上并行应用某些函数?顺序地,这可以例如通过保存CSR阵列并且joblib.dump打开它joblib.load(.., mmap_mode="r")并逐个处理行的块来完成.使用dask可以更有效地完成这项工作吗?

特别是,假设一个人不需要在稀疏数组上完成所有可能的核心操作,而只需要并行加载行块(每个块是一个CSR数组)并对它们应用一些函数(在我的情况下它会例如estimator.predict(X)来自scikit-learn).

此外,磁盘上是否有适合此任务的文件格式?Joblib有效,但我不确定作为内存映射加载的CSR数组的(并行)性能; spark.mllib似乎使用一些自定义稀疏存储格式(似乎没有纯Python解析器)或LIBSVM格式(根据我的经验,scikit-learn中的解析器比它慢得多joblib.dump)...

注意:我在https://github.com/dask/dask/上阅读了文档,有关它的各种问题,但我仍然不确定如何最好地解决这个问题.

编辑:为了给出一个更实际的例子,下面是在密码数组的dask中工作的代码,但在使用带有此错误的稀疏数组时失败,

import numpy as np
import scipy.sparse

import joblib
import dask.array as da
from sklearn.utils import gen_batches

np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')

fh = joblib.load('X_dense.pkl', mmap_mode='r')

# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))

# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()
Run Code Online (Sandbox Code Playgroud)

EDIT2:按照下面的讨论中,下面的例子中克服了以前的错误,但得到有关的人IndexError: tuple index out of rangedask/array/core.py:L3413,

import dask
# +imports from the example above
dask.set_options(get=dask.get)  # disable multiprocessing

fh = joblib.load('X_csr.pkl', mmap_mode='r')

def func(x):
    if x.ndim == 0:
        # dask does some heuristics with dummy data, if the x is a 0d array
        # the sum command would fail
        return x
    res = np.asarray(x.sum(axis=1, keepdims=True))
    return res

Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()
Run Code Online (Sandbox Code Playgroud)

Oba*_*bay 6

所以我对 joblib 或 dask 一无所知,更不用说您的应用程序特定数据格式了。但实际上可以从磁盘中分块读取稀疏矩阵,同时保留稀疏数据结构。

虽然CSR 格式维基百科文章很好地解释了它的工作原理,但我将简要回顾一下:

一些稀疏矩阵,例如:

1 0 2
0 0 3
4 5 6
Run Code Online (Sandbox Code Playgroud)

通过记住每个非零值及其所在的列来存储:

sparse.data    = 1 2 3 4 5 6  # acutal value
sparse.indices = 0 2 2 0 1 2  # number of column (0-indexed)
Run Code Online (Sandbox Code Playgroud)

现在我们仍然缺少行。压缩格式只存储每行中有多少非零值,而不是存储每个值行。

请注意,非零计数也会累加,因此以下数组包含直到并包括该行的非零值的数量。更复杂的是,数组总是以 a 开头,0因此包含num_rows+1条目:

sparse.indptr = 0 2 3 6
Run Code Online (Sandbox Code Playgroud)

所以直到并包括第二行有 3 个非零值,即1,23

既然我们已经解决了这个问题,我们就可以开始“切片”矩阵了。目标是为某些块构造data,indicesindptr数组。假设原始的巨大矩阵存储在三个二进制文件中,我们将增量读取这些文件。我们使用生成器重复yield一些块。

为此,我们需要知道每个块中有多少非零值,并读取相应数量的值和列索引。可以方便地从 indptr 数组中读取非零计数。这是通过从indptr对应于所需块大小的大文件中读取一些条目来实现的。indptr文件该部分的最后一个条目减去非零值的数量之前给出了该块中非零值的数量。所以块dataindices数组只是从大文件dataindices文件中切片。该indptr阵列需要用人工预加一个零(即是什么格式想要的,不要问我:d)。

然后我们可以用 chunk 构造一个稀疏矩阵dataindicesindptr得到一个新的稀疏矩阵。

需要注意的是,实际的矩阵大小不能仅从三个数组中直接重建。它要么是矩阵的最大列索引,要么是您不走运并且块中没有未确定的数据。所以我们还需要传递列数。

我可能以一种相当复杂的方式解释了事情,所以只需将其视为实现此类生成器的不透明代码片段即可:

import numpy as np
import scipy.sparse


def gen_batches(batch_size, sparse_data_path, sparse_indices_path, 
                sparse_indptr_path, dtype=np.float32, column_size=None):
    data_item_size = dtype().itemsize

    with open(sparse_data_path, 'rb') as data_file, \
            open(sparse_indices_path, 'rb') as indices_file, \
            open(sparse_indptr_path, 'rb') as indptr_file:
        nnz_before = np.fromstring(indptr_file.read(4), dtype=np.int32)

        while True:
            indptr_batch = np.frombuffer(nnz_before.tobytes() +
                              indptr_file.read(4*batch_size), dtype=np.int32)

            if len(indptr_batch) == 1:
                break

            batch_indptr = indptr_batch - nnz_before
            nnz_before = indptr_batch[-1]
            batch_nnz = np.asscalar(batch_indptr[-1])

            batch_data = np.frombuffer(data_file.read(
                                       data_item_size * batch_nnz), dtype=dtype)
            batch_indices = np.frombuffer(indices_file.read(
                                          4 * batch_nnz), dtype=np.int32)

            dimensions = (len(indptr_batch)-1, column_size)

            matrix = scipy.sparse.csr_matrix((batch_data, 
                           batch_indices, batch_indptr), shape=dimensions)

            yield matrix


if __name__ == '__main__':
    sparse = scipy.sparse.random(5, 4, density=0.1, format='csr', dtype=np.float32)

    sparse.data.tofile('sparse.data')        # dtype as specified above  ^^^^^^^^^^
    sparse.indices.tofile('sparse.indices')  # dtype=int32
    sparse.indptr.tofile('sparse.indptr')    # dtype=int32

    print(sparse.toarray())
    print('========')

    for batch in gen_batches(2, 'sparse.data', 'sparse.indices', 
                             'sparse.indptr', column_size=4):
        print(batch.toarray())
Run Code Online (Sandbox Code Playgroud)

numpy.ndarray.tofile()刚刚店二进制字节数组,所以你需要记住的数据格式。scipy.sparseindices和表示indptrint32,因此这是对总矩阵大小的限制。

我还对代码进行了基准测试,发现 scipy csr 矩阵构造函数是小矩阵的瓶颈。您的里程可能会有所不同,这只是“原则证明”。

如果需要更复杂的实现,或者某些东西太难了,请联系我:)