Spark-sortWithInPartitions超过排序

Thi*_*790 4 apache-spark apache-spark-sql spark-cassandra-connector apache-spark-dataset

以下是代表员工in_date和out_date的样本数据集。我必须获取所有员工的最后in_time。

Spark在4节点独立群集上运行。

初始数据集:

员工ID -----入职日期-----离职日期

1111111     2017-04-20  2017-09-14 
1111111     2017-11-02  null 
2222222     2017-09-26  2017-09-26 
2222222     2017-11-28  null 
3333333     2016-01-07  2016-01-20 
3333333     2017-10-25  null 
Run Code Online (Sandbox Code Playgroud)

之后的数据集df.sort(col(in_date).desc())

员工编号-in_date ----- out_date

1111111   2017-11-02   null 
1111111   2017-04-20   2017-09-14 
2222222   2017-09-26   2017-09-26 
2222222   2017-11-28   null 
3333333   2017-10-25   null 
3333333   2016-01-07   2016-01-20 
Run Code Online (Sandbox Code Playgroud)
df.dropDup(EmployeeID):  
Run Code Online (Sandbox Code Playgroud)

输出

员工ID -----入职日期-----离职日期

1111111    2017-11-02    null 
2222222    2017-09-26    2017-09-26 
3333333    2016-01-07    2016-01-20 
Run Code Online (Sandbox Code Playgroud)

预期数据集:

员工ID -----入职日期-----离职日期

1111111    2017-11-02   null 
2222222    2017-11-28   null 
3333333    2017-10-25   null 
Run Code Online (Sandbox Code Playgroud)

但是,当我使用进行初始数据集排序sortWithInPartitions并进行重复数据删除时,我得到了预期的数据集。我在这里错过了大大小小的东西吗?任何帮助表示赞赏。

附加信息: 当在本地模式下用Spark执行df.sort时,实现了上述预期输出。
我没有做任何分区,重新分区。初始数据集是从基础Cassandra数据库获得的。

use*_*411 6

TL; DR除非明确保证,否则您永远不应假定Spark中的操作将以任何特定顺序执行,尤其是在使用Spark SQL时。

您在这里缺少的是随机播放。dropDuplicates实现等效于:

df.groupBy(idCols).agg(first(c) for c in nonIdCols)
Run Code Online (Sandbox Code Playgroud)

将执行为:

  • 部分(“地图端”)聚合。
  • 随机播放。
  • 最终(“减少端”)聚合。

中间混洗会引入不确定性,并且不能保证最终聚合将以任何特定顺序进行。

当在本地模式下用Spark执行df.sort时,可以达到上述预期输出。

local模式相当简单。您永远不要使用它来得出关于完全分布式模式下Spark内部构件行为的结论。

当我使用sortWithInPartitions对初始数据集进行排序并进行重复数据删除时,我得到了预期的数据集。

如果以前使用来对数据进行分区,这将是有意义的EmployeeID。在那种情况下,Spark不需要额外的洗牌。

根据描述,您似乎应该使用如何选择每个组的第一行中显示的解决方案之一