我有一个Spark 2.0数据帧,example具有以下结构:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
Run Code Online (Sandbox Code Playgroud)
它包含每个id的24个条目(一天中每小时一个),并使用orderBy函数按id,小时排序.
我创建了一个聚合器groupConcat:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override …Run Code Online (Sandbox Code Playgroud) scala apache-spark spark-streaming apache-spark-sql spark-dataframe