Tw *_*Nus 8 apache-spark apache-spark-sql pyspark pyspark-sql
我有一个Spark sql数据帧,由一ID列和n"数据"列组成,即
id | dat1 | dat2 | ... | datn
Run Code Online (Sandbox Code Playgroud)
所述idcolumnn是唯一确定的,反之,在看dat1 ... datn有可能重复.
我的目标是找到id那些重复的.
到目前为止我的方法:
使用groupBy以下方法获取重复行:
dup_df = df.groupBy(df.columns[1:]).count().filter('count > 1')
加入dup_df整个df以获取重复的行,包括 id:
df.join(dup_df, df.columns[1:])
我很确定这基本上是正确的,它失败了,因为dat1 ... datn列包含null值.
要做到join的null价值观,我发现.eg 这个SO职位.但这需要构建一个巨大的"字符串连接条件".
因此我的问题:
joins处理null价值观?ids?BTW:我使用的是Spark 2.1.0和Python 3.5.3
use*_*411 12
如果ids每组的数量相对较小,你可以groupBy和collect_list.需要进口
from pyspark.sql.functions import collect_list, size
Run Code Online (Sandbox Code Playgroud)
示例数据:
df = sc.parallelize([
(1, "a", "b", 3),
(2, None, "f", None),
(3, "g", "h", 4),
(4, None, "f", None),
(5, "a", "b", 3)
]).toDF(["id"])
Run Code Online (Sandbox Code Playgroud)
查询:
(df
.groupBy(df.columns[1:])
.agg(collect_list("id").alias("ids"))
.where(size("ids") > 1))
Run Code Online (Sandbox Code Playgroud)
结果:
+----+---+----+------+
| _2| _3| _4| ids|
+----+---+----+------+
|null| f|null|[2, 4]|
| a| b| 3|[1, 5]|
+----+---+----+------+
Run Code Online (Sandbox Code Playgroud)
您可以将explode两次(或使用一次udf)应用于与返回的输出相当的输出join.
您还可以使用每组最小的组来识别id组.一些额外的进口:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, count, min
Run Code Online (Sandbox Code Playgroud)
窗口定义:
w = Window.partitionBy(df.columns[1:])
Run Code Online (Sandbox Code Playgroud)
查询:
(df
.select(
"*",
count("*").over(w).alias("_cnt"),
min("id").over(w).alias("group"))
.where(col("_cnt") > 1))
Run Code Online (Sandbox Code Playgroud)
结果:
+---+----+---+----+----+-----+
| id| _2| _3| _4|_cnt|group|
+---+----+---+----+----+-----+
| 2|null| f|null| 2| 2|
| 4|null| f|null| 2| 2|
| 1| a| b| 3| 2| 1|
| 5| a| b| 3| 2| 1|
+---+----+---+----+----+-----+
Run Code Online (Sandbox Code Playgroud)
您可以进一步使用group列进行自联接.
| 归档时间: |
|
| 查看次数: |
8422 次 |
| 最近记录: |