Spark DataFrame:在orderBy维护该命令之后是否groupBy?

Ana*_*dor 16 scala apache-spark spark-streaming apache-spark-sql spark-dataframe

我有一个Spark 2.0数据帧,example具有以下结构:

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
Run Code Online (Sandbox Code Playgroud)

它包含每个id的24个条目(一天中每小时一个),并使用orderBy函数按id,小时排序.

我创建了一个聚合器groupConcat:

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
    override def zero: String = ""

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

    override def merge(b1: String, b2: String) = b1 + b2

    override def finish(b: String) = b.substring(1)

    override def bufferEncoder: Encoder[String] = Encoders.STRING

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }.toColumn
Run Code Online (Sandbox Code Playgroud)

它帮助我将列连接成字符串以获取最终的数据帧:

id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.
Run Code Online (Sandbox Code Playgroud)

我的问题是,如果我这样做example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"),是否可以保证每小时的数量将在各自的桶中正确排序?

我读到这不一定是RDD的情况(参见Spark按键排序然后分组才能获得有序可迭代?),但是对于DataFrames可能有所不同?

如果没有,我该如何解决呢?

Ada*_*air 18

groupBy在orderBy之后没有维持秩序,正如其他人所指出的那样.你想要做的是使用一个Window函数 - id上的分区和按小时排序.您可以对此进行collect_list,然后获取结果列表中的最大(最大),因为它们累积起来(即第一个小时将只在列表中出现,第二个小时将在列表中有2个元素,依此类推).

完整的示例代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val data = Seq(( "id1", 0, 12),
  ("id1", 1, 55),
  ("id1", 23, 44),
  ("id2", 0, 12),
  ("id2", 1, 89),
  ("id2", 23, 34)).toDF("id", "hour", "count")

    val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
    data.withColumn("collected", collect_list($"count")
                                                    .over(Window.partitionBy("id")
                                                                 .orderBy("hour")))
            .groupBy("id")
            .agg(max($"collected").as("collected"))
            .withColumn("hourly_count", mergeList($"collected"))
            .select("id", "hourly_count").show
Run Code Online (Sandbox Code Playgroud)

这使我们保持在DataFrame世界中.我还简化了OP使用的UDF代码.

输出:

+---+------------+
| id|hourly_count|
+---+------------+
|id1|    12:55:44|
|id2|    12:89:34|
+---+------------+
Run Code Online (Sandbox Code Playgroud)


小智 5

我有一个案例,订单并不总是保持:有时是,大多数没有.

我的数据帧在Spark 1.6上运行了200个分区

df_group_sort = data.orderBy(times).groupBy(group_key).agg(
                                                  F.sort_array(F.collect_list(times)),
                                                  F.collect_list(times)
                                                           )
Run Code Online (Sandbox Code Playgroud)

检查排序我比较的返回值

F.sort_array(F.collect_list(times))
Run Code Online (Sandbox Code Playgroud)

F.collect_list(times)
Run Code Online (Sandbox Code Playgroud)

给出eg(left:sort_array(collect_list()); right:collect_list())

2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000
Run Code Online (Sandbox Code Playgroud)

左列始终排序,而右列仅由排序块组成.对于take()的不同执行,右列中块的顺序是不同的.


小智 5

如果要解决Java中的实现问题(Scala和Python应该相似):

example.orderBy(“hour”)
.groupBy(“id”)
.agg(functions.sort_array(
  functions.collect_list( 
     functions.struct(dataRow.col(“hour”),
                      dataRow.col(“count”))),false)
 .as(“hourly_count”));
Run Code Online (Sandbox Code Playgroud)