uza*_*ude 5 apache-spark apache-spark-sql
我们有一个常见的用例,即按照行的创建顺序对表进行重复数据删除。
例如,我们有用户操作的事件日志。用户时不时地标记他最喜欢的类别。在我们的分析阶段,我们只想知道用户最后最喜欢的类别。
示例数据:
id  action_type value date 
123 fav_category 1    2016-02-01
123 fav_category 4    2016-02-02
123 fav_category 8    2016-02-03
123 fav_category 2    2016-02-04
我们只想根据日期列获取最新的更新。我们当然可以用 sql 来做:
select * from (
  select *, row_number() over (
      partition by id,action_type order by date desc) as rnum from tbl
  ) 
where rnum=1;
但是,它不会在映射器端部分聚合,我们会将所有数据混洗到减速器。
我已经发布了一个包含此问题SPARK-17662的 Jira ,并以更好的 SQL 风格建议关闭了它:
select id,
       action_type,
       max(struct(date, *)) last_record
from   tbl
group by id,action_type
虽然这个解决方案更加干净,但仍然存在两个问题:
我们最终为此编写了一个 UDAF,它克服了问题 1,但仍然遇到问题 2。
有人知道更好的解决方案吗?
对于任何想要我们当前解决方案的人。这是 UDAF 的代码 - 请注意,我们必须使用一些内部函数,因此我们位于包 org.apache.spark.sql.types 中:
package org.apache.spark.sql.types
case class MaxValueByKey(child1: Expression, child2: Expression) extends DeclarativeAggregate {
  override def children: Seq[Expression] = child1 :: child2 :: Nil
  override def nullable: Boolean = true
  // Return data type.
  override def dataType: DataType = child2.dataType
  // Expected input data type.
  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, AnyDataType)
  override def checkInputDataTypes(): TypeCheckResult =
    TypeUtils.checkForOrderingExpr(child1.dataType, "function max")
  private lazy val max = AttributeReference("max", child1.dataType)()
  private lazy val data = AttributeReference("data", child2.dataType)()
  override lazy val aggBufferAttributes: Seq[AttributeReference] = max :: data :: Nil
  override lazy val initialValues: Seq[Expression] = Seq(
    Literal.create(null, child1.dataType),
    Literal.create(null, child2.dataType)
  )
  override lazy val updateExpressions: Seq[Expression] =
    chooseKeyValue(max, data, child1, child2)
  override lazy val mergeExpressions: Seq[Expression] =
    chooseKeyValue(max.left, data.left, max.right, data.right)
  def chooseKeyValue(key1:Expression, value1: Expression, key2:Expression, value2: Expression) = Seq(
    If(IsNull(key1), key2,  If(IsNull(key2), key1,    If(GreaterThan(key1, key2), key1, key2))),
    If(IsNull(key1), value2, If(IsNull(key2), value1, If(GreaterThan(key1, key2), value1, value2)))
  )
  override lazy val evaluateExpression: AttributeReference = data
}
object SparkMoreUDAFs {
  def maxValueByKey(key: Column, value: Column): Column =
      Column(MaxValueByKey(key.expr, value.expr).toAggregateExpression(false))
}
用法是:
sqlContext.table("tbl").groupBy($"id",$"action_type")
          .agg(SparkMoreUDAFs.maxValueByKey($"date", expr("struct(date,*)")).as("s"))
我不确定它是否非常优雅,但它可以进行地图端部分聚合并且适用于所有列类型。而且,我认为这个 UDAF 本身也很有用。
希望它能帮助别人..
| 归档时间: | 
 | 
| 查看次数: | 762 次 | 
| 最近记录: |