有没有办法重写Spark RDD distinct使用mapPartitions而不是distinct?

Gle*_*ker 6 scala shuffle distinct apache-spark rdd

我有一个RDD,它太大而不能一致地执行一个不同的语句而没有虚假错误(例如,SparkException阶段失败4次,ExecutorLostFailure,HDFS文件系统关闭,最大执行程序失败次数,阶段因SparkContext关闭而被取消,等等)

我试图计算特定列中的不同ID,例如:

print(myRDD.map(a => a._2._1._2).distinct.count())
Run Code Online (Sandbox Code Playgroud)

是否有一种简单,一致,不太随机密集的方式来执行上面的命令,可能使用mapPartitions,reduceByKey,flatMap或其他使用较少shuffle而不是不同的命令?

另请参阅导致Shuffle的Spark转换是什么?

Jus*_*ony 3

弄清楚是否存在另一个潜在问题可能会更好,但下面的内容将做您想要的......而不是迂回的方式来做到这一点,但听起来它会符合您的要求:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(Set[YourType]())((agg, value) => agg + value, (agg1, agg2) => agg1 ++ agg2) 
  .keys
  .count
Run Code Online (Sandbox Code Playgroud)

或者甚至这似乎有效,但它不是关联性和交换性的。它的工作原理取决于 Spark 的内部工作原理...但我可能会遗漏一个案例...所以虽然更简单,但我不确定我是否信任它:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(YourTypeDefault)((x,y)=>y, (x,y)=>x)
  .keys.count
Run Code Online (Sandbox Code Playgroud)