uza*_*ude 5 apache-spark apache-spark-sql
我们有一个常见的用例,即按照行的创建顺序对表进行重复数据删除。
例如,我们有用户操作的事件日志。用户时不时地标记他最喜欢的类别。在我们的分析阶段,我们只想知道用户最后最喜欢的类别。
示例数据:
id action_type value date
123 fav_category 1 2016-02-01
123 fav_category 4 2016-02-02
123 fav_category 8 2016-02-03
123 fav_category 2 2016-02-04
Run Code Online (Sandbox Code Playgroud)
我们只想根据日期列获取最新的更新。我们当然可以用 sql 来做:
select * from (
select *, row_number() over (
partition by id,action_type order by date desc) as rnum from tbl
)
where rnum=1;
Run Code Online (Sandbox Code Playgroud)
但是,它不会在映射器端部分聚合,我们会将所有数据混洗到减速器。
我已经发布了一个包含此问题SPARK-17662的 Jira ,并以更好的 SQL 风格建议关闭了它:
select id,
action_type,
max(struct(date, *)) last_record
from tbl
group by id,action_type
Run Code Online (Sandbox Code Playgroud)
虽然这个解决方案更加干净,但仍然存在两个问题:
我们最终为此编写了一个 UDAF,它克服了问题 1,但仍然遇到问题 2。
有人知道更好的解决方案吗?
对于任何想要我们当前解决方案的人。这是 UDAF 的代码 - 请注意,我们必须使用一些内部函数,因此我们位于包 org.apache.spark.sql.types 中:
package org.apache.spark.sql.types
case class MaxValueByKey(child1: Expression, child2: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child1 :: child2 :: Nil
override def nullable: Boolean = true
// Return data type.
override def dataType: DataType = child2.dataType
// Expected input data type.
override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, AnyDataType)
override def checkInputDataTypes(): TypeCheckResult =
TypeUtils.checkForOrderingExpr(child1.dataType, "function max")
private lazy val max = AttributeReference("max", child1.dataType)()
private lazy val data = AttributeReference("data", child2.dataType)()
override lazy val aggBufferAttributes: Seq[AttributeReference] = max :: data :: Nil
override lazy val initialValues: Seq[Expression] = Seq(
Literal.create(null, child1.dataType),
Literal.create(null, child2.dataType)
)
override lazy val updateExpressions: Seq[Expression] =
chooseKeyValue(max, data, child1, child2)
override lazy val mergeExpressions: Seq[Expression] =
chooseKeyValue(max.left, data.left, max.right, data.right)
def chooseKeyValue(key1:Expression, value1: Expression, key2:Expression, value2: Expression) = Seq(
If(IsNull(key1), key2, If(IsNull(key2), key1, If(GreaterThan(key1, key2), key1, key2))),
If(IsNull(key1), value2, If(IsNull(key2), value1, If(GreaterThan(key1, key2), value1, value2)))
)
override lazy val evaluateExpression: AttributeReference = data
}
object SparkMoreUDAFs {
def maxValueByKey(key: Column, value: Column): Column =
Column(MaxValueByKey(key.expr, value.expr).toAggregateExpression(false))
}
Run Code Online (Sandbox Code Playgroud)
用法是:
sqlContext.table("tbl").groupBy($"id",$"action_type")
.agg(SparkMoreUDAFs.maxValueByKey($"date", expr("struct(date,*)")).as("s"))
Run Code Online (Sandbox Code Playgroud)
我不确定它是否非常优雅,但它可以进行地图端部分聚合并且适用于所有列类型。而且,我认为这个 UDAF 本身也很有用。
希望它能帮助别人..
| 归档时间: |
|
| 查看次数: |
762 次 |
| 最近记录: |