Thi*_*790 4 apache-spark apache-spark-sql spark-cassandra-connector apache-spark-dataset
以下是代表员工in_date和out_date的样本数据集。我必须获取所有员工的最后in_time。
Spark在4节点独立群集上运行。
初始数据集:
员工ID -----入职日期-----离职日期
1111111 2017-04-20 2017-09-14
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2016-01-07 2016-01-20
3333333 2017-10-25 null
Run Code Online (Sandbox Code Playgroud)
之后的数据集df.sort(col(in_date).desc()):
员工编号-in_date ----- out_date
1111111 2017-11-02 null
1111111 2017-04-20 2017-09-14
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2017-10-25 null
3333333 2016-01-07 2016-01-20
Run Code Online (Sandbox Code Playgroud)
df.dropDup(EmployeeID):
Run Code Online (Sandbox Code Playgroud)
输出:
员工ID -----入职日期-----离职日期
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
3333333 2016-01-07 2016-01-20
Run Code Online (Sandbox Code Playgroud)
预期数据集:
员工ID -----入职日期-----离职日期
1111111 2017-11-02 null
2222222 2017-11-28 null
3333333 2017-10-25 null
Run Code Online (Sandbox Code Playgroud)
但是,当我使用进行初始数据集排序sortWithInPartitions并进行重复数据删除时,我得到了预期的数据集。我在这里错过了大大小小的东西吗?任何帮助表示赞赏。
附加信息:
当在本地模式下用Spark执行df.sort时,实现了上述预期输出。
我没有做任何分区,重新分区。初始数据集是从基础Cassandra数据库获得的。
TL; DR除非明确保证,否则您永远不应假定Spark中的操作将以任何特定顺序执行,尤其是在使用Spark SQL时。
您在这里缺少的是随机播放。dropDuplicates实现等效于:
df.groupBy(idCols).agg(first(c) for c in nonIdCols)
Run Code Online (Sandbox Code Playgroud)
将执行为:
中间混洗会引入不确定性,并且不能保证最终聚合将以任何特定顺序进行。
当在本地模式下用Spark执行df.sort时,可以达到上述预期输出。
local模式相当简单。您永远不要使用它来得出关于完全分布式模式下Spark内部构件行为的结论。
当我使用sortWithInPartitions对初始数据集进行排序并进行重复数据删除时,我得到了预期的数据集。
如果以前使用来对数据进行分区,这将是有意义的EmployeeID。在那种情况下,Spark不需要额外的洗牌。
根据描述,您似乎应该使用如何选择每个组的第一行中显示的解决方案之一。。
| 归档时间: |
|
| 查看次数: |
3399 次 |
| 最近记录: |