Spark:转换DataFrame而不进行聚合

nev*_*_me 8 scala apache-spark

我在网上看了很多问题,但他们似乎没有做我想要实现的目标.

我正在使用带有Scala的Apache Spark 2.0.2.

我有一个数据帧:

+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
|         1|  100|   0|   0|   0|   0|   0|
|         2|    0|  50|   0|   0|  20|   0|
|         3|    0|   0|   0|   0|   0|   0|
|         4|    0|   0|   0|   0|   0|   0|
+----------+-----+----+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)

我想要转置

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
|val2|    0|  50|   0|   0|
|val3|    0|   0|   0|   0|
|val4|    0|   0|   0|   0|
|val5|    0|  20|   0|   0|
|val6|    0|   0|   0|   0|
+----+-----+----+----+----+
Run Code Online (Sandbox Code Playgroud)

我尝试过使用pivot()但我找不到正确的答案.我最终循环遍历我的val{x}列,并按照下面的方式旋转,但事实证明这很慢.

val d = df.select('segment_id, 'val1)

+----------+-----+
|segment_id| val1|
+----------+-----+
|         1|  100|
|         2|    0|
|         3|    0|
|         4|    0|
+----------+-----+

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
+----+-----+----+----+----+
Run Code Online (Sandbox Code Playgroud)

然后union()val{x}我的第一个数据帧的每次迭代中使用.

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val2|    0|  50|   0|   0|
+----+-----+----+----+----+
Run Code Online (Sandbox Code Playgroud)

是否有一种更有效的转置方式,我不想聚合数据?

谢谢 :)

use*_*411 9

不幸的是,没有时间:

  • DataFrame考虑到数据量,Spark 是合理的.
  • 转换数据是可行的.

您必须记住,DataFrame在Spark中实现的是行的分布式集合,并且每个行都在单个节点上存储和处理.

你可以在表达换位DataFramepivot:

val kv = explode(array(df.columns.tail.map { 
  c => struct(lit(c).alias("k"), col(c).alias("v")) 
}: _*))

df
  .withColumn("kv", kv)
  .select($"segment_id", $"kv.k", $"kv.v")
  .groupBy($"k")
  .pivot("segment_id")
  .agg(first($"v"))
  .orderBy($"k")
  .withColumnRenamed("k", "vals")
Run Code Online (Sandbox Code Playgroud)

但它只是一个没有实际应用的玩具代码.在实践中,它并不比收集数据更好:

val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
  case Array(h, t @ _*) => {
    (h.map(_.toString), t.map(_.collect { case x: Int => x }))
  }
}

val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val schema = StructType(
  StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
)

spark.createDataFrame(sc.parallelize(rows), schema)
Run Code Online (Sandbox Code Playgroud)

对于DataFrame定义为:

val df = Seq(
  (1, 100, 0, 0, 0, 0, 0),
  (2, 0, 50, 0, 0, 20, 0),
  (3, 0, 0, 0, 0, 0, 0),
  (4, 0, 0, 0, 0, 0, 0)
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
Run Code Online (Sandbox Code Playgroud)

你会给你想要的结果吗?

+----+---+---+---+---+
|vals|  1|  2|  3|  4|
+----+---+---+---+---+
|val1|100|  0|  0|  0|
|val2|  0| 50|  0|  0|
|val3|  0|  0|  0|  0|
|val4|  0|  0|  0|  0|
|val5|  0| 20|  0|  0|
|val6|  0|  0|  0|  0|
+----+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)

话虽如此,如果您需要在分布式数据结构上进行有效的转置,您将不得不寻找其他地方.有许多结构,包括核心CoordinateMatrixBlockMatrix,可以跨两个维度分布数据,并可以转置.