Spark distinct 的实现

Joh*_*ter 2 sorting scala dataframe apache-spark apache-spark-sql

我是 Spark 和 Scala 的新手。我正在阅读 Spark 的 distinct() 函数。但我找不到任何适当的细节。我有一些我无法解决的疑问,并已将它们写下来。

  1. 在 Spark 中如何实现 distinct() ?

    我不太擅长使用 Spark 源代码来识别整个流程。当我检查执行计划时,我只能看到一个 ShuffleRDD

  2. distinct 的时间复杂度是多少?

    我还从 Google 搜索中发现,它还以某种方式使用了散列和排序。

    所以,我想它是否使用与在 Hashset 的帮助下从数组中获取唯一元素相同的原理。如果它是一个系统,我会猜到时间复杂度是 O(nlogn) 。

    但是它分布在许多分区中并被打乱,时间复杂度的顺序是什么?

  3. 有没有办法避免在特定情况下改组?

    如果我确保按照我的用例正确分区我的数据,我可以避免改组吗?

    即,例如,假设在具有唯一行的数据框中分解一个 ArrayType 列会创建新行,而其他列被复制。我将选择其他列。通过这种方式,我确保每个分区的重复项都是唯一的。因为我知道每个分区的重复项是唯一的,所以我可以避免洗牌,只是敏锐地删除该分区中的重复项

我还发现这是否 spark 的 distinct() 函数只对每个分区中的不同元组进行洗牌

谢谢你的帮助 。如果我在任何地方错了,请纠正我。

小智 6

在 Spark 中如何实现 distinct() ?

通过应用具有None值的虚拟聚合。大致

rdd.map((_, None)).reduceByKey((a, b) => a)
Run Code Online (Sandbox Code Playgroud)

distinct 的时间复杂度是多少?

鉴于该过程的整体复杂性,很难估计。它至少是 O(N log N),因为 shuffle 需要排序,但是考虑到构建额外的非核心数据结构(包括关联数组)所需的多个其他操作,序列化/反序列化数据可以更高,并且实际上由 IO 主导操作,而不是纯粹的算法复杂度。

有没有办法避免在特定情况下改组?

是的,如果保证潜在的重复项被放置在同一个分区上。,

您可以使用mapPartitions去重复数据,特别是如果数据已排序或以其他方式保证在孤立的邻域中具有重复项。如果没有这个,您可能会受到内存要求的限制,除非您接受概率过滤器(如布隆过滤器)的近似结果。

一般来说,虽然这是不可能的,但这样的操作将是非本地的。