SH *_* Y. 5 scala r apache-spark apache-spark-sql
我正在尝试使用 Spark DataFrame API 将我的数据框从长到宽重塑。数据集是学生提问的问题和答案的集合。这是一个巨大的数据集,Q(问题)和 A(答案)大约范围从 1 到 50000。我想收集所有可能的 Q*A 对并使用它们来构建列。如果学生对问题 1 的回答为 1,我们将值 1 分配给第 1_1 列。否则,我们给它一个0。数据集已经在S_ID、Q、A上进行了去重。
在 R 中,我可以简单地使用库 reshape2 中的 dcast,但我不知道如何使用 Spark 来做到这一点。我在下面的链接中找到了旋转的解决方案,但它需要固定数量的不同对的 Q*A。 http://rajasoftware.net/index.php/database/91446/scala-apache-spark-pivot-dataframes-pivot-spark-dataframe
我还尝试使用用户定义的函数连接 Q 和 A,并应用交叉表但是,我从控制台收到以下错误,即使到目前为止我只在示例数据文件上测试我的代码 -
The maximum limit of le6 pairs have been collected, which may not be all of the pairs.
Please try reducing the amount of distinct items in your columns.
Run Code Online (Sandbox Code Playgroud)
原始数据:
S_ID、Q、A
1、1、1
1、2、2
1、3、3
2、1、1
2、2、3
2、3、4
2、4、5
=> 长到宽变换后:
S_ID、QA_1_1、QA_2_2、QA_3_3 、QA_2_3、QA_3_4、QA_4_5 1、1、1、1、0、0、0
2、1、0、0、1、1、1
R code.
library(dplyr); library(reshape2);
df1 <- df %>% group_by(S_ID, Q, A) %>% filter(row_number()==1) %>% mutate(temp=1)
df1 %>% dcast(S_ID ~ Q + A, value.var="temp", fill=0)
Spark code.
val fnConcatenate = udf((x: String, y: String) => {"QA_"+ x +"_" + y})
df1 = df.distinct.withColumn("QA", fnConcatenate($"Q", $"A"))
df2 = stat.crosstab("S_ID", "QA")
Run Code Online (Sandbox Code Playgroud)
任何想法将不胜感激。
您在这里尝试执行的操作在设计上是错误的,原因有两个:
RDD[Row]。这意味着行越大,您可以在单个分区上放置的内容就越少,因此聚合等操作的成本要高得多,并且需要更多的网络流量。当您拥有适当的列式存储并且可以实现高效压缩或聚合等功能时,宽表非常有用。从实用的角度来看,几乎所有可以用宽表做的事情都可以用长表使用组/窗口函数来完成。
您可以尝试的一件事是使用稀疏向量创建宽格式:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.StringIndexer
import sqlContext.implicits._
df.registerTempTable("df")
val dfComb = sqlContext.sql("SELECT s_id, CONCAT(Q, '\t', A) AS qa FROM df")
val indexer = new StringIndexer()
.setInputCol("qa")
.setOutputCol("idx")
.fit(dfComb)
val indexed = indexer.transform(dfComb)
val n = indexed.agg(max("idx")).first.getDouble(0).toInt + 1
val wideLikeDF = indexed
.select($"s_id", $"idx")
.rdd
.map{case Row(s_id: String, idx: Double) => (s_id, idx.toInt)}
.groupByKey // This assumes no duplicates
.mapValues(vals => Vectors.sparse(n, vals.map((_, 1.0)).toArray))
.toDF("id", "qaVec")
Run Code Online (Sandbox Code Playgroud)
这里最酷的部分是您可以轻松地将其转换IndexedRowMatrix为例如计算 SVD
val mat = new IndexedRowMatrix(wideLikeDF.map{
// Here we assume that s_id can be mapped directly to Long
// If not it has to be indexed
case Row(id: String, qaVec: SparseVector) => IndexedRow(id.toLong, qaVec)
})
val svd = mat.computeSVD(3)
Run Code Online (Sandbox Code Playgroud)
或RowMatrix获取列统计信息或计算主成分:
val colStats = mat.toRowMatrix.computeColumnSummaryStatistic
val colSims = mat.toRowMatrix.columnSimilarities
val pc = mat.toRowMatrix.computePrincipalComponents(3)
Run Code Online (Sandbox Code Playgroud)
编辑:
在 Spark 1.6.0+ 中,您可以使用pivot函数。