Bab*_*abu 5 scala dataframe apache-spark apache-spark-sql
我有一个包含两列数据的数据框,如下所示
+----+-----------------+
|acct| device|
+----+-----------------+
| B| List(3, 4)|
| C| List(3, 5)|
| A| List(2, 6)|
| B|List(3, 11, 4, 9)|
| C| List(5, 6)|
| A|List(2, 10, 7, 6)|
+----+-----------------+
Run Code Online (Sandbox Code Playgroud)
我需要如下结果
+----+-----------------+
|acct| device|
+----+-----------------+
| B|List(3, 4, 11, 9)|
| C| List(3, 5, 6)|
| A|List(2, 6, 7, 10)|
+----+-----------------+
Run Code Online (Sandbox Code Playgroud)
我试过如下,但似乎不起作用
df.groupBy("acct").agg(concat("device"))
df.groupBy("acct").agg(collect_set("device"))
请让我知道如何使用 Scala 实现这一目标?
您可以从分解device列开始,然后像以前一样继续 - 但请注意,它可能不会保留列表的顺序(无论如何在任何分组依据中都不能保证):
val result = df.withColumn("device", explode($"device"))
.groupBy("acct")
.agg(collect_set("device"))
result.show(truncate = false)
// +----+-------------------+
// |acct|collect_set(device)|
// +----+-------------------+
// |B |[9, 3, 4, 11] |
// |C |[5, 6, 3] |
// |A |[2, 6, 10, 7] |
// +----+-------------------+
Run Code Online (Sandbox Code Playgroud)
另一个可能比该选项表现更好的选项explode:创建您自己的UserDefinedAggregationFunction,将列表合并到不同的集合中。
您必须UserDefinedAggregateFunction按如下方式扩展:
class MergeListsUDAF extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(Seq(StructField("a", ArrayType(IntegerType))))
override def bufferSchema: StructType = inputSchema
override def dataType: DataType = ArrayType(IntegerType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0, mutable.Seq[Int]())
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val existing = buffer.getAs[mutable.Seq[Int]](0)
val newList = input.getAs[mutable.Seq[Int]](0)
val result = (existing ++ newList).distinct
buffer.update(0, result)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)
override def evaluate(buffer: Row): Any = buffer.getAs[mutable.Seq[Int]](0)
}
Run Code Online (Sandbox Code Playgroud)
并像这样使用它:
val mergeUDAF = new MergeListsUDAF()
df.groupBy("acct").agg(mergeUDAF($"device"))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6900 次 |
| 最近记录: |