在大型数据集上将 Spark DataFrame 从长到宽重塑

SH *_* Y. 5 scala r apache-spark apache-spark-sql

我正在尝试使用 Spark DataFrame API 将我的数据框从长到宽重塑。数据集是学生提问的问题和答案的集合。这是一个巨大的数据集,Q(问题)和 A(答案)大约范围从 1 到 50000。我想收集所有可能的 Q*A 对并使用它们来构建列。如果学生对问题 1 的回答为 1,我们将值 1 分配给第 1_1 列。否则,我们给它一个0。数据集已经在S_ID、Q、A上进行了去重。

在 R 中,我可以简单地使用库 reshape2 中的 dcast,但我不知道如何使用 Spark 来做到这一点。我在下面的链接中找到了旋转的解决方案,但它需要固定数量的不同对的 Q*A。 http://rajasoftware.net/index.php/database/9​​1446/scala-apache-spark-pivot-dataframes-pivot-spark-dataframe

我还尝试使用用户定义的函数连接 Q 和 A,并应用交叉表但是,我从控制台收到以下错误,即使到目前为止我只在示例数据文件上测试我的代码 -

The maximum limit of le6 pairs have been collected, which may not be all of the pairs.  
Please try reducing the amount of distinct items in your columns.
Run Code Online (Sandbox Code Playgroud)

原始数据:

S_ID、Q、A
1、1、1
1、2、2
1、3、3
2、1、1
2、2、3
2、3、4
2、4、5

=> 长到宽变换后:


S_ID、QA_1_1、QA_2_2、QA_3_3 、QA_2_3、QA_3_4、QA_4_5 1、1、1、1、0、0、0
2、1、0、0、1、1、1

R code.  
library(dplyr); library(reshape2);  
df1 <- df %>% group_by(S_ID, Q, A) %>% filter(row_number()==1) %>% mutate(temp=1)  
df1 %>% dcast(S_ID ~ Q + A, value.var="temp", fill=0)  

Spark code.
val fnConcatenate = udf((x: String, y: String) => {"QA_"+ x +"_" + y})
df1 = df.distinct.withColumn("QA", fnConcatenate($"Q", $"A"))
df2 = stat.crosstab("S_ID", "QA")
Run Code Online (Sandbox Code Playgroud)

任何想法将不胜感激。

zer*_*323 2

您在这里尝试执行的操作在设计上是错误的,原因有两个:

  1. 您可以用密集数据集替换稀疏数据集。当涉及到内存需求和计算时,它的成本很高,而且当您拥有大型数据集时,它几乎从来都不是一个好主意
  2. 您限制了本地处理数据的能力。稍微简化一下 Spark 数据帧只是一个包装器RDD[Row]。这意味着行越大,您可以在单个分区上放置的内容就越少,因此聚合等操作的成本要高得多,并且需要更多的网络流量。

当您拥有适当的列式存储并且可以实现高效压缩或聚合等功能时,宽表非常有用。从实用的角度来看,几乎所有可以用宽表做的事情都可以用长表使用组/窗口函数来完成。

您可以尝试的一件事是使用稀疏向量创建宽格式:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.StringIndexer
import sqlContext.implicits._

df.registerTempTable("df")
val dfComb = sqlContext.sql("SELECT s_id, CONCAT(Q, '\t', A) AS qa FROM df")

val indexer = new StringIndexer()
  .setInputCol("qa")
  .setOutputCol("idx")
  .fit(dfComb)

val indexed = indexer.transform(dfComb)

val n = indexed.agg(max("idx")).first.getDouble(0).toInt + 1

val wideLikeDF = indexed
  .select($"s_id", $"idx")
  .rdd
  .map{case Row(s_id: String, idx: Double) => (s_id, idx.toInt)}
  .groupByKey // This assumes no duplicates
  .mapValues(vals => Vectors.sparse(n, vals.map((_, 1.0)).toArray))
  .toDF("id", "qaVec")
Run Code Online (Sandbox Code Playgroud)

这里最酷的部分是您可以轻松地将其转换IndexedRowMatrix为例如计算 SVD

val mat = new IndexedRowMatrix(wideLikeDF.map{
  // Here we assume that s_id can be mapped directly to Long
  // If not it has to be indexed
  case Row(id: String, qaVec: SparseVector) => IndexedRow(id.toLong, qaVec)
})

val svd = mat.computeSVD(3)
Run Code Online (Sandbox Code Playgroud)

RowMatrix获取列统计信息或计算主成分:

val colStats = mat.toRowMatrix.computeColumnSummaryStatistic
val colSims = mat.toRowMatrix.columnSimilarities
val pc = mat.toRowMatrix.computePrincipalComponents(3)
Run Code Online (Sandbox Code Playgroud)

编辑

在 Spark 1.6.0+ 中,您可以使用pivot函数。