hom*_*mar 6 grouping scala apache-spark
我想重写一些用RDD编写的代码来使用DataFrames.在我找到这个之前,它工作得非常顺利:
events
.keyBy(row => (row.getServiceId + row.getClientCreateTimestamp + row.getClientId, row) )
.reduceByKey((e1, e2) => if(e1.getClientSendTimestamp <= e2.getClientSendTimestamp) e1 else e2)
.values
Run Code Online (Sandbox Code Playgroud)
它很简单
events
.groupBy(events("service_id"), events("client_create_timestamp"), events("client_id"))
Run Code Online (Sandbox Code Playgroud)
但下一步是什么?如果我想迭代当前组中的每个元素怎么办?它甚至可能吗?提前致谢.
GroupedData不能直接使用。数据没有物理分组,只是逻辑运算。您必须应用agg方法的一些变体,例如:
events
.groupBy($"service_id", $"client_create_timestamp", $"client_id")
.min("client_send_timestamp")
Run Code Online (Sandbox Code Playgroud)
或者
events
.groupBy($"service_id", $"client_create_timestamp", $"client_id")
.agg(min($"client_send_timestamp"))
Run Code Online (Sandbox Code Playgroud)
在哪里client_send_timestamp是您要聚合的列。
如果您想保留信息而不只是聚合join或使用窗口函数 - 请参阅在 Spark DataFrame 中查找每组的最大行数
Spark 还支持用户定义的聚合函数 - 请参阅如何在 Spark SQL 中定义和使用用户定义的聚合函数?
火花2.0+
您可以使用Dataset.groupByKey将组公开为迭代器。
| 归档时间: |
|
| 查看次数: |
3267 次 |
| 最近记录: |