pre*_*eti 23 distinct apache-spark
我是Apache Spark的新手,正在学习基本的功能.有一个小小的疑问.假设我有一个元组的RDD(键,值),并希望从中获得一些独特的元素.我使用distinct()函数.我想知道函数在什么基础上认为元组是完全不同的?它是基于键,值,还是两者兼而有之?
Gle*_*ker 24
.distinct()肯定会跨分区进行洗牌.要查看更多正在发生的事情,请在RDD上运行.toDebugString.
val hashPart = new HashPartitioner(<number of partitions>)
val myRDDPreStep = <load some RDD>
val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)
Run Code Online (Sandbox Code Playgroud)
对于我所拥有的RDD示例(myRDDPreStep已经被密钥哈希分区,由StorageLevel.MEMORY_AND_DISK_SER持久化,并且检查点),返回:
(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
| myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]
Run Code Online (Sandbox Code Playgroud)
请注意,如果您的RDD已经以智能方式分区并且分区不会过度倾斜,那么可能有更有效的方法来获得涉及更少shuffle的独特方式.
请参阅有没有办法重写Spark RDD distinct以使用mapPartition而不是distinct? 和 Apache Spark:使用RDD.aggregateByKey()的RDD.groupByKey()的等效实现是什么?
Pau*_*aul 10
RDD.distinct()的API文档仅提供一个句子描述:
"返回一个包含此RDD中不同元素的新RDD."
从最近的经验我可以告诉你,在一个元组-RDD中,整个元组都被考虑了.
如果您需要不同的键或不同的值,那么根据您想要完成的内容,您可以:
A.呼叫groupByKey()转换{(k1,v11),(k1,v12),(k2,v21),(k2,v22)}到{(k1,[v11,v12]), (k2,[v21,v22])}; 要么
B.通过调用keys()或values()后跟来删除键或值distinct()
在撰写本文时(2015年6月),加州大学伯克利分校+ EdX正在运行一个免费的在线课程,介绍大数据和Apache Spark,它将提供这些功能的实践.
Justin Pihony是对的.Distinct使用对象的hashCode和equals方法进行此确定.它返回不同的元素(对象)
val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
Run Code Online (Sandbox Code Playgroud)
不同
rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
Run Code Online (Sandbox Code Playgroud)
如果要在密钥上应用distinct.在这种情况下,减少是更好的选择
ReduceBy
val reduceRDD= rdd.map(tup =>
(tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2)
reduceRDD.collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)
输出: -
(2,20)
(1,20)
(3,21)
Run Code Online (Sandbox Code Playgroud)
distinct使用对象的hashCode和equals方法进行此确定。元组内置了平等机制,该平等机制下放到每个对象的平等和位置。因此,distinct将对整个Tuple2对象起作用。正如保罗指出,你可以打电话keys或values再distinct。或者,您可以通过编写自己的不同值aggregateByKey,以保持密钥配对。或者,如果您需要不同的键,则可以使用常规键aggregate
| 归档时间: |
|
| 查看次数: |
66557 次 |
| 最近记录: |