对于某些字段,仅保留DataFrame中的重复项

Ann*_*eso 4 apache-spark pyspark spark-dataframe

我有这个spark DataFrame:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|AUTRE|     2|null|    08:58:00|    23:29:00|
|TDR|  QWA|     3|null|    08:57:00|    23:28:00|
|ALT| TEST|     4|null|    08:56:00|    23:27:00|
|ALT|  QWA|     6|null|    08:55:00|    23:26:00|
|ALT|  QWA|     2|null|    08:54:00|    23:25:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+
Run Code Online (Sandbox Code Playgroud)

我想得到一个新的数据帧,只有3个字段不是唯一的行"ID","ID2""Number".

这意味着我想要这个DataFrame:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+
Run Code Online (Sandbox Code Playgroud)

或者可能是包含所有重复项的数据框:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|  QWA|     6|null|    08:55:00|    23:26:00|
|ALT|  QWA|     2|null|    08:54:00|    23:25:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+
Run Code Online (Sandbox Code Playgroud)

pau*_*ult 12

一种方法是使用a pyspark.sql.Window来添加一列来计算每行("ID", "ID2", "Name")组合的重复数.然后仅选择重复次数大于1的行.

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy('ID', 'ID2', 'Number')
df.select('*', f.count('ID').over(w).alias('dupeCount'))\
    .where('dupeCount > 1')\
    .drop('dupeCount')\
    .show()
#+---+---+------+----+------------+------------+
#| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
#+---+---+------+----+------------+------------+
#|ALT|QWA|     2|null|    08:54:00|    23:25:00|
#|ALT|QWA|     2|null|    08:53:00|    23:24:00|
#|ALT|QWA|     6|null|    08:59:00|    23:30:00|
#|ALT|QWA|     6|null|    08:55:00|    23:26:00|
#+---+---+------+----+------------+------------+
Run Code Online (Sandbox Code Playgroud)

我曾经pyspark.sql.functions.count()计算过每组中的项目数.这将返回一个包含所有重复项的DataFrame(您显示的第二个输出).

如果您希望每个("ID", "ID2", "Name")组合只获得一行,则可以使用另一个窗口来排序行.

例如,下面我添加了另一列,row_number并且只选择重复计数大于1且行号等于1的行.这样可以保证每个分组一行.

w2 = Window.partitionBy('ID', 'ID2', 'Number').orderBy('ID', 'ID2', 'Number')
df.select(
        '*',
        f.count('ID').over(w).alias('dupeCount'),
        f.row_number().over(w2).alias('rowNum')
    )\
    .where('(dupeCount > 1) AND (rowNum = 1)')\
    .drop('dupeCount', 'rowNum')\
    .show()
#+---+---+------+----+------------+------------+
#| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
#+---+---+------+----+------------+------------+
#|ALT|QWA|     2|null|    08:54:00|    23:25:00|
#|ALT|QWA|     6|null|    08:59:00|    23:30:00|
#+---+---+------+----+------------+------------+
Run Code Online (Sandbox Code Playgroud)


Mar*_*man 12

这是一种无需 Window 即可完成的方法。

具有重复项的 DataFrame

df.exceptAll(df.drop_duplicates(['ID', 'ID2', 'Number'])).show()
# +---+---+------+------------+------------+
# | ID|ID2|Number|Opening_Hour|Closing_Hour|
# +---+---+------+------------+------------+
# |ALT|QWA|     2|    08:53:00|    23:24:00|
# |ALT|QWA|     6|    08:55:00|    23:26:00|
# +---+---+------+------------+------------+
Run Code Online (Sandbox Code Playgroud)

包含所有重复项的 DataFrame(使用 left_anti join)

df.join(df.groupBy('ID', 'ID2', 'Number')\
          .count().where('count = 1').drop('count'),
        on=['ID', 'ID2', 'Number'],
        how='left_anti').show()
# +---+---+------+------------+------------+
# | ID|ID2|Number|Opening_Hour|Closing_Hour|
# +---+---+------+------------+------------+
# |ALT|QWA|     2|    08:54:00|    23:25:00|
# |ALT|QWA|     2|    08:53:00|    23:24:00|
# |ALT|QWA|     6|    08:59:00|    23:30:00|
# |ALT|QWA|     6|    08:55:00|    23:26:00|
# +---+---+------+------------+------------+
Run Code Online (Sandbox Code Playgroud)


Tho*_*mas 5

为了扩展 pault非常好的答案:我经常需要将数据帧子集化为仅重复 x 次的条目,并且由于我需要经常这样做,因此我将其转换为一个函数,我只是将其与许多其他辅助函数一起导入在我的脚本开头:

import pyspark.sql.functions as f
from pyspark.sql import Window
def get_entries_with_frequency(df, cols, num):
  if type(cols)==str:
    cols = [cols]
  w = Window.partitionBy(cols)
  return df.select('*', f.count(cols[0]).over(w).alias('dupeCount'))\
           .where("dupeCount = {}".format(num))\
           .drop('dupeCount')
Run Code Online (Sandbox Code Playgroud)