如何从 Scala 中的 DataFrame 在 Spark 中创建分布式稀疏矩阵

mon*_*mon 5 scala sparse-matrix apache-spark apache-spark-mllib

问题

请帮助找到从 DataFrame 中的(用户、特征、值)记录创建分布式矩阵的方法,其中特征及其值存储在列中。

以下是数据摘录,但用户数量和功能较多,并且并非所有功能都经过用户测试。因此,许多特征值都是空的并且被归为 0。

例如,血液测试可能以血糖水平胆固醇水平等为特征。如果这些级别不可接受,则将值设置为 1。但并非所有功能都会针对用户(或患者)进行测试。

+----+-------+-----+
|user|feature|value|
+----+-------+-----+
|  14|      0|    1|
|  14|    222|    1|
|  14|    200|    1|
|  22|      0|    1|
|  22|     32|    1|
|  22|    147|    1|
|  22|    279|    1|
|  22|    330|    1|
|  22|    363|    1|
|  22|    162|    1|
|  22|    811|    1|
|  22|    290|    1|
|  22|    335|    1|
|  22|    681|    1|
|  22|    786|    1|
|  22|    789|    1|
|  22|    842|    1|
|  22|    856|    1|
|  22|    881|    1|
+----+-------+-----+

Run Code Online (Sandbox Code Playgroud)

如果特征已经是列,那么就有一些方法可以解释。

但这种情况并非如此。因此,一种方法可能是旋转数据框来应用这些方法。

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|  22|  1|  1|  1|  1|  0|  0|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)

然后使用行到向量的转换。我想使用其中之一:

  • 矢量汇编器
  • org.apache.spark.mllib.linalg.Vectors.fromML
  • org.apache.spark.mllib.linalg.distributed.MatrixEntry

然而,由于会有许多空值被估算为 0,因此旋转后的数据帧将消耗更多的内存空间。此外,旋转分布在多个节点之间的大型数据帧也会导致大量的洗牌。

因此,寻求意见、想法、建议。

有关的

环境

火花2.4.4

mon*_*mon 1

解决方案

  1. 为每个输入行创建一个 RDD[(user, feature)]。
  2. groupByKey 创建 RDD[(user, [feature+])]。
  3. 创建一个 RDD[IndexedRow],其中每个 IndexedRow 代表以下所有现有功能。
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
Run Code Online (Sandbox Code Playgroud)
  1. 将 RDD[IndexedRow] 转换为 IndexedRowMatrix。

对于乘积运算,将RowIndexedMatrix转换为支持分布式乘积运算的BlockMatrix。

将每条原始记录转换为 IndexedRow

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

def toIndexedRow(userToFeaturesMap:(Int, Iterable[Int]), maxFeatureId: Int): IndexedRow = {
    userToFeaturesMap match {
        case (userId, featureIDs) => {
            val featureCountKV = featureIDs.map(i => (i, 1.0)).toSeq
            new IndexedRow (
                userId,
                Vectors.sparse(maxFeatureId + 1, featureCountKV)
            )
        }
    }
}

val userToFeatureCounters= featureData.rdd
    .map(rowPF => (rowPF.getInt(0), rowPF.getInt(1)))  // Out from ROW[(userId, featureId)]
    .groupByKey()                                      // (userId, Iterable(featureId))
    .map(
        userToFeatureIDsMap => toIndexedRow(userToFeatureIDsMap, maxFeatureId)
    )                                                 // IndexedRow(userId, Vector((featureId, 1)))
Run Code Online (Sandbox Code Playgroud)

创建索引行矩阵

val userFeatureIndexedMatrix = new IndexedRowMatrix(userToFeatureCounters)
Run Code Online (Sandbox Code Playgroud)

通过 BlockMatrix 传输 IndexedRowMatrix,因为 IndexedRowMatrix 不支持转置

val userFeatureBlockMatrixTransposed = userFeatureBlockMatrix
    .transpose
Run Code Online (Sandbox Code Playgroud)

使用 BlockMatrix 作为 IndexedRowMatrix 创建的产品需要右侧的 Local DenseMatrix。

val featuresTogetherIndexedMatrix = userFeatureBlockMatrix
    .multiply(userFeatureBlockMatrixTransposed)
    .toIndexedRowMatrix
Run Code Online (Sandbox Code Playgroud)