火花数据帧丢弃重复并保持第一

ad_*_*d_s 14 duplicates dataframe apache-spark

问题:在丢弃重复项时,在pandas中,您可以指定要保留的列.Spark Dataframes中是否有等价物?

熊猫:

df.sort_values('actual_datetime', ascending=False).drop_duplicates(subset=['scheduled_datetime', 'flt_flightnumber'], keep='first')
Run Code Online (Sandbox Code Playgroud)

Spark数据帧(我使用Spark 1.6.0)没有keep选项

df.orderBy(['actual_datetime']).dropDuplicates(subset=['scheduled_datetime', 'flt_flightnumber'])
Run Code Online (Sandbox Code Playgroud)

想象一下'scheduled_datetime'和'flt_flightnumber'是第6,17列.通过基于这些列的值创建密钥,我们还可以进行重复数据删除

def get_key(x):
    return "{0}{1}".format(x[6],x[17])

df= df.map(lambda x: (get_key(x),x)).reduceByKey(lambda x,y: (x))
Run Code Online (Sandbox Code Playgroud)

但如何指定保留第一行并摆脱其他重复?最后一排怎么样?

tim*_*mle 25

对于每个说dropDuplicates都会保持第一次出现的人- 严格来说这不是正确的。

dropDuplicates保留排序操作的“第一次出现”-仅在有1个分区的情况下。请参见下面的示例。
但是,这对于大多数Spark数据集不切实际。因此,我还提供了一个使用Window函数+ sort + rank + filter的“首次出现”放置重复操作的示例。
例如,参见帖子底部。

使用pyspark在Spark 2.4.0中对此进行了测试。

dropDuplicates示例

import pandas as pd

# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
Run Code Online (Sandbox Code Playgroud)
   col1     datestr
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-02-01
1     1  2018-02-01
2     2  2018-02-01
3     3  2018-02-01
4     4  2018-02-01
0     0  2018-03-01
1     1  2018-03-01
2     2  2018-03-01
3     3  2018-03-01
4     4  2018-03-01
Run Code Online (Sandbox Code Playgroud)
   col1     datestr
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-02-01
1     1  2018-02-01
2     2  2018-02-01
3     3  2018-02-01
4     4  2018-02-01
0     0  2018-03-01
1     1  2018-03-01
2     2  2018-03-01
3     3  2018-03-01
4     4  2018-03-01
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-02-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
# first example
# does not give first (based on datestr)
(spark.createDataFrame(dfall)
   .orderBy('datestr')
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates NOT based on occurrence of sorted datestr
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-02-01|
|   1|2018-01-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-02-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-02-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   2|2018-01-01|
|   3|2018-01-01|
|   4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
# second example
# testing what happens with repartition
(spark.createDataFrame(dfall)
   .orderBy('datestr')
   .repartition('datestr')
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates NOT based on occurrence of sorted datestr

Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-03-01|
|   2|2018-03-01|
|   3|2018-03-01|
|   4|2018-03-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)

窗口,排序,等级,过滤器示例

+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-02-01|
|   1|2018-01-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-02-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
#third example
# testing with coalesce(1)
(spark
   .createDataFrame(dfall)
   .orderBy('datestr')
   .coalesce(1)
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates based on occurrence of sorted datestr
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   3|2018-01-01|
|   2|2018-01-01|
|   4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   2|2018-01-01|
|   3|2018-01-01|
|   4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
   col1     datestr
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-03-01
1     1  2018-03-01
2     2  2018-03-01
3     3  2018-03-01
4     4  2018-03-01
Run Code Online (Sandbox Code Playgroud)
# fourth example
# testing with reverse sort then coalesce(1)
(spark
   .createDataFrame(dfall)
   .orderBy('datestr', ascending = False)
   .coalesce(1)
   .dropDuplicates(subset = ['col1'])
   .show()
)
# dropDuplicates based on occurrence of sorted datestr```
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   0|2018-01-01|
|   1|2018-01-01|
|   1|2018-01-01|
|   3|2018-01-01|
|   3|2018-01-01|
|   2|2018-01-01|
|   2|2018-01-01|
|   4|2018-01-01|
|   4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-03-01|
|   2|2018-03-01|
|   3|2018-03-01|
|   4|2018-03-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   3|2018-01-01|
|   2|2018-01-01|
|   4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)

  • 这个答案几乎是最优的。代替允许联系共享相同等级的“等级”,可以使用row_number()函数代替rank()。这比使用平局列更简单。 (3认同)
  • 我用真实数据集(500k+)进行了测试,需要 .coalesce(1) 。 (2认同)
  • 以上都没有给出正确的答案。对于这篇文章,我们需要一个更好的答案。 (2认同)

小智 17

用途windowrow_number功能。
按升序或降序排序以选择第一个或最后一个。

from pyspark.sql import Window
from pyspark.sql import functions as f

window = Window.partitionBy("col1").orderBy("datestr").asc()
df = (df.withColumn('row', f.row_number().over(window))\
.filter(col('row') == 1)
.drop('row')
.show())
Run Code Online (Sandbox Code Playgroud)

  • 这个答案已经存在/sf/answers/4097804091/ (2认同)

Car*_*llo 6

您可以使用带有 row_number 的窗口:

import pandas as pd
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = spark.createDataFrame(pd.concat([df1,df2,df3]))

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col,row_number
window = Window.partitionBy('col1').orderBy(col('datestr'))
dfall.select('*', row_number().over(window).alias('posicion')).show()
dfall.select('*', row_number().over(window).alias('posicion')).where('posicion ==1').show()

+----+----------+--------+
|col1|   datestr|posicion|
+----+----------+--------+
|   0|2018-01-01|       1|
|   0|2018-02-01|       2|
|   0|2018-03-01|       3|
|   1|2018-01-01|       1|
|   1|2018-02-01|       2|
|   1|2018-03-01|       3|
|   3|2018-01-01|       1|
|   3|2018-02-01|       2|
|   3|2018-03-01|       3|
|   2|2018-01-01|       1|
|   2|2018-02-01|       2|
|   2|2018-03-01|       3|
|   4|2018-01-01|       1|
|   4|2018-02-01|       2|
|   4|2018-03-01|       3|
+----+----------+--------+
+----+----------+--------+
|col1|   datestr|posicion|
+----+----------+--------+
|   0|2018-01-01|       1|
|   1|2018-01-01|       1|
|   3|2018-01-01|       1|
|   2|2018-01-01|       1|
|   4|2018-01-01|       1|
+----+----------+--------+
Run Code Online (Sandbox Code Playgroud)


Mah*_*afy 5

我做了以下事情:

dataframe.groupBy("uniqueColumn").min("time")
Run Code Online (Sandbox Code Playgroud)

这将按给定的列分组,并在同一组内选择时间最短的那个(这将保留第一个并删除其他的)

  • 不过,您将需要后续连接来保留任何其他列。 (3认同)

Vee*_*rni 4

解决方案 1 添加一个新列 row num(增量列),并在对您感兴趣的所有列进行分组后基于最小行删除重复项。(您可以包括除 row num col 之外的所有用于删除重复项的列)

解决方案 2: 将数据帧转换为 rdd (df.rdd),然后将 rdd 分组到一个或多个或所有键上,然后在该组上运行 lambda 函数并按照您想要的方式删除行并仅返回你对。。。感兴趣。

我的一位朋友(同样)提到下面的(旧解决方案)对他不起作用。 默认情况下使用 dropDuplicates 方法它会保留第一次出现的情况。

  • 您能提供有效的来源吗? (5认同)