ad_*_*d_s 14 duplicates dataframe apache-spark
问题:在丢弃重复项时,在pandas中,您可以指定要保留的列.Spark Dataframes中是否有等价物?
熊猫:
df.sort_values('actual_datetime', ascending=False).drop_duplicates(subset=['scheduled_datetime', 'flt_flightnumber'], keep='first')
Run Code Online (Sandbox Code Playgroud)
Spark数据帧(我使用Spark 1.6.0)没有keep选项
df.orderBy(['actual_datetime']).dropDuplicates(subset=['scheduled_datetime', 'flt_flightnumber'])
Run Code Online (Sandbox Code Playgroud)
想象一下'scheduled_datetime'和'flt_flightnumber'是第6,17列.通过基于这些列的值创建密钥,我们还可以进行重复数据删除
def get_key(x):
return "{0}{1}".format(x[6],x[17])
df= df.map(lambda x: (get_key(x),x)).reduceByKey(lambda x,y: (x))
Run Code Online (Sandbox Code Playgroud)
但如何指定保留第一行并摆脱其他重复?最后一排怎么样?
tim*_*mle 25
对于每个说dropDuplicates都会保持第一次出现的人- 严格来说这不是正确的。
dropDuplicates保留排序操作的“第一次出现”-仅在有1个分区的情况下。请参见下面的示例。
但是,这对于大多数Spark数据集不切实际。因此,我还提供了一个使用Window函数+ sort + rank + filter的“首次出现”放置重复操作的示例。
例如,参见帖子底部。
使用pyspark在Spark 2.4.0中对此进行了测试。
import pandas as pd
# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
Run Code Online (Sandbox Code Playgroud)
col1 datestr
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-02-01
1 1 2018-02-01
2 2 2018-02-01
3 3 2018-02-01
4 4 2018-02-01
0 0 2018-03-01
1 1 2018-03-01
2 2 2018-03-01
3 3 2018-03-01
4 4 2018-03-01
Run Code Online (Sandbox Code Playgroud)
col1 datestr
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-02-01
1 1 2018-02-01
2 2 2018-02-01
3 3 2018-02-01
4 4 2018-02-01
0 0 2018-03-01
1 1 2018-03-01
2 2 2018-03-01
3 3 2018-03-01
4 4 2018-03-01
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-02-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
# first example
# does not give first (based on datestr)
(spark.createDataFrame(dfall)
.orderBy('datestr')
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates NOT based on occurrence of sorted datestr
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-02-01|
| 1|2018-01-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-02-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-02-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 2|2018-01-01|
| 3|2018-01-01|
| 4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
# second example
# testing what happens with repartition
(spark.createDataFrame(dfall)
.orderBy('datestr')
.repartition('datestr')
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates NOT based on occurrence of sorted datestr
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-03-01|
| 2|2018-03-01|
| 3|2018-03-01|
| 4|2018-03-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-02-01|
| 1|2018-01-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-02-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
#third example
# testing with coalesce(1)
(spark
.createDataFrame(dfall)
.orderBy('datestr')
.coalesce(1)
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates based on occurrence of sorted datestr
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 3|2018-01-01|
| 2|2018-01-01|
| 4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 2|2018-01-01|
| 3|2018-01-01|
| 4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
col1 datestr
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-03-01
1 1 2018-03-01
2 2 2018-03-01
3 3 2018-03-01
4 4 2018-03-01
Run Code Online (Sandbox Code Playgroud)
# fourth example
# testing with reverse sort then coalesce(1)
(spark
.createDataFrame(dfall)
.orderBy('datestr', ascending = False)
.coalesce(1)
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates based on occurrence of sorted datestr```
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 0|2018-01-01|
| 1|2018-01-01|
| 1|2018-01-01|
| 3|2018-01-01|
| 3|2018-01-01|
| 2|2018-01-01|
| 2|2018-01-01|
| 4|2018-01-01|
| 4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-03-01|
| 2|2018-03-01|
| 3|2018-03-01|
| 4|2018-03-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 3|2018-01-01|
| 2|2018-01-01|
| 4|2018-01-01|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
小智 17
用途window和row_number功能。
按升序或降序排序以选择第一个或最后一个。
from pyspark.sql import Window
from pyspark.sql import functions as f
window = Window.partitionBy("col1").orderBy("datestr").asc()
df = (df.withColumn('row', f.row_number().over(window))\
.filter(col('row') == 1)
.drop('row')
.show())
Run Code Online (Sandbox Code Playgroud)
您可以使用带有 row_number 的窗口:
import pandas as pd
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = spark.createDataFrame(pd.concat([df1,df2,df3]))
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col,row_number
window = Window.partitionBy('col1').orderBy(col('datestr'))
dfall.select('*', row_number().over(window).alias('posicion')).show()
dfall.select('*', row_number().over(window).alias('posicion')).where('posicion ==1').show()
+----+----------+--------+
|col1| datestr|posicion|
+----+----------+--------+
| 0|2018-01-01| 1|
| 0|2018-02-01| 2|
| 0|2018-03-01| 3|
| 1|2018-01-01| 1|
| 1|2018-02-01| 2|
| 1|2018-03-01| 3|
| 3|2018-01-01| 1|
| 3|2018-02-01| 2|
| 3|2018-03-01| 3|
| 2|2018-01-01| 1|
| 2|2018-02-01| 2|
| 2|2018-03-01| 3|
| 4|2018-01-01| 1|
| 4|2018-02-01| 2|
| 4|2018-03-01| 3|
+----+----------+--------+
+----+----------+--------+
|col1| datestr|posicion|
+----+----------+--------+
| 0|2018-01-01| 1|
| 1|2018-01-01| 1|
| 3|2018-01-01| 1|
| 2|2018-01-01| 1|
| 4|2018-01-01| 1|
+----+----------+--------+
Run Code Online (Sandbox Code Playgroud)
我做了以下事情:
dataframe.groupBy("uniqueColumn").min("time")
Run Code Online (Sandbox Code Playgroud)
这将按给定的列分组,并在同一组内选择时间最短的那个(这将保留第一个并删除其他的)
解决方案 1 添加一个新列 row num(增量列),并在对您感兴趣的所有列进行分组后基于最小行删除重复项。(您可以包括除 row num col 之外的所有用于删除重复项的列)
解决方案 2: 将数据帧转换为 rdd (df.rdd),然后将 rdd 分组到一个或多个或所有键上,然后在该组上运行 lambda 函数并按照您想要的方式删除行并仅返回你对。。。感兴趣。
我的一位朋友(同样)提到下面的(旧解决方案)对他不起作用。 默认情况下使用 dropDuplicates 方法它会保留第一次出现的情况。