小编uza*_*ude的帖子

Spark Sql 行重复删除

我们有一个常见的用例,即按照行的创建顺序对表进行重复数据删除。

例如,我们有用户操作的事件日志。用户时不时地标记他最喜欢的类别。在我们的分析阶段,我们只想知道用户最后最喜欢的类别。

示例数据:

id  action_type value date 
123 fav_category 1    2016-02-01
123 fav_category 4    2016-02-02
123 fav_category 8    2016-02-03
123 fav_category 2    2016-02-04
Run Code Online (Sandbox Code Playgroud)

我们只想根据日期列获取最新的更新。我们当然可以用 sql 来做:

select * from (
  select *, row_number() over (
      partition by id,action_type order by date desc) as rnum from tbl
  ) 
where rnum=1;
Run Code Online (Sandbox Code Playgroud)

但是,它不会在映射器端部分聚合,我们会将所有数据混洗到减速器。

我已经发布了一个包含此问题SPARK-17662的 Jira ,并以更好的 SQL 风格建议关闭了它:

select id,
       action_type,
       max(struct(date, *)) last_record
from   tbl
group by id,action_type
Run Code Online (Sandbox Code Playgroud)

虽然这个解决方案更加干净,但仍然存在两个问题:

  1. 如果其中一个字段不可排序(例如 map<>),则此技巧不起作用
  2. 如果稍后在流程中我们仅选择某些字段,则我们将无法获得下推谓词来优化流程并从一开始就忽略不需要的字段。

我们最终为此编写了一个 UDAF,它克服了问题 1,但仍然遇到问题 2。

有人知道更好的解决方案吗?

apache-spark apache-spark-sql

5
推荐指数
1
解决办法
762
查看次数

标签 统计

apache-spark ×1

apache-spark-sql ×1