根据RDD/Spark DataFrame中的特定列从行中删除重复项

Jas*_*son 53 apache-spark apache-spark-sql pyspark

假设我有一个相当大的数据集,形式如下:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])
Run Code Online (Sandbox Code Playgroud)

我想要做的是仅根据第一,第三和第四列的值删除重复的行.

删除完全重复的行很简单:

data = data.distinct()
Run Code Online (Sandbox Code Playgroud)

第5行或第6行将被删除

但是,我如何仅删除基于第1,3和4列的重复行?即删除以下任何一个:

('Baz',22,'US',6)
('Baz',36,'US',6)
Run Code Online (Sandbox Code Playgroud)

在Python中,这可以通过使用指定列来完成.drop_duplicates().我怎样才能在Spark/Pyspark中实现同样的目标?

vae*_*r-k 60

Pyspark 确实包括一种dropDuplicates()方法.https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+
Run Code Online (Sandbox Code Playgroud)

也许它是在@Jason(OP)使用的后续版本中引入的?

编辑:是的,它是在1.4中引入的

  • 有没有办法捕获它确实丢失的记录? (2认同)
  • x = usersDf.drop_duplicates(subset=['DETUserId']) - X 数据帧将是所有删除的记录 (2认同)
  • @Rodney这不是文档所说的:“返回一个删除了重复行的新数据帧,可以选择仅考虑某些列。” https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates (2认同)

小智 24

根据您的问题,您不清楚要使用哪些列来确定重复项.解决方案背后的一般思想是根据标识重复项的列的值创建密钥.然后,您可以使用reduceByKey或reduce操作来消除重复项.

以下是一些可以帮助您入门的代码:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))
Run Code Online (Sandbox Code Playgroud)

现在,你有一个键值RDD由列1,3和4,下一步将键控是A reduceByKeygroupByKeyfilter.这将消除重复.

r = m.reduceByKey(lambda x,y: (x))
Run Code Online (Sandbox Code Playgroud)


Dav*_*fin 12

我知道你已经接受了另一个答案,但是如果你想把它做成一个DataFrame,只需使用groupBy和agg.假设您已经创建了DF(列名为"col1","col2"等),您可以执行以下操作:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")
Run Code Online (Sandbox Code Playgroud)

请注意,在这种情况下,我选择了最大col2,但你可以做avg,min等.

  • 到目前为止,我对 DataFrame 的体验是它们让一切变得更加优雅并且速度更快。 (2认同)

小智 11

同意大卫.要添加,我们可能不希望groupBy聚合函数中的列以外的所有列,即,如果我们想要纯粹基于列的子集删除重复项并保留原始数据框中的所有列.因此,更好的方法是使用Spark 1.4.0中提供的dropDuplicates Dataframe api

供参考,请参阅:https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame


Ara*_*mar 9

我使用了内置函数dropDuplicates().Scala代码如下

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()
Run Code Online (Sandbox Code Playgroud)

输出:

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+
Run Code Online (Sandbox Code Playgroud)


Sam*_*mar 5

下面的程序将帮助您删除整个重复项,或者如果您想根据某些列删除重复项,您甚至可以这样做:

import org.apache.spark.sql.SparkSession

object DropDuplicates {

    def main(args: Array[String]) {
        val spark =
            SparkSession.builder()
                .appName("DataFrame-DropDuplicates")
                .master("local[4]")
                .getOrCreate()

        import spark.implicits._

        // Create an RDD of tuples with some data
        val custs = Seq(
            (1, "Widget Co", 120000.00, 0.00, "AZ"),
            (2, "Acme Widgets", 410500.00, 500.00, "CA"),
            (3, "Widgetry", 410500.00, 200.00, "CA"),
            (4, "Widgets R Us", 410500.00, 0.0, "CA"),
            (3, "Widgetry", 410500.00, 200.00, "CA"),
            (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
            (6, "Widget Co", 12000.00, 10.00, "AZ")
        )
        val customerRows = spark.sparkContext.parallelize(custs, 4)

        // Convert RDD of tuples to DataFrame by supplying column names
        val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")

        println("*** Here's the whole DataFrame with duplicates")

        customerDF.printSchema()

        customerDF.show()

        // Drop fully identical rows
        val withoutDuplicates = customerDF.dropDuplicates()

        println("*** Now without duplicates")

        withoutDuplicates.show()

        val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))

        println("*** Now without partial duplicates too")

        withoutPartials.show()
    }

}
Run Code Online (Sandbox Code Playgroud)