加入后火花删除多个重复的列

kpc*_*der 1 apache-spark apache-spark-sql

加入两个数据框后,我得到了许多重复的列,现在我想删除最后一个列,下面是我的 printSchema

root
 |-- id: string (nullable = true)
 |-- value: string (nullable = true)
 |-- test: string (nullable = true)
 |-- details: string (nullable = true)
 |-- test: string (nullable = true)
 |-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

现在我想删除最后两列

 |-- test: string (nullable = true)
 |-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我尝试使用 df..dropDuplicates() 但它全部删除

如何删除最后出现的重复列?

sta*_*106 6

您必须使用 vararg 语法从数组中获取列名并将其删除。检查以下:

scala> dfx.show
+---+---+---+---+------------+------+
|  A|  B|  C|  D|         arr|mincol|
+---+---+---+---+------------+------+
|  1|  2|  3|  4|[1, 2, 3, 4]|     A|
|  5|  4|  3|  1|[5, 4, 3, 1]|     D|
+---+---+---+---+------------+------+

scala> dfx.columns
res120: Array[String] = Array(A, B, C, D, arr, mincol)

scala> val dropcols = Array("arr","mincol")
dropcols: Array[String] = Array(arr, mincol)

scala> dfx.drop(dropcols:_*).show
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  4|  3|  1|
+---+---+---+---+


scala>
Run Code Online (Sandbox Code Playgroud)

更新1:

scala>  val df = Seq((1,2,3,4),(5,4,3,1)).toDF("A","B","C","D")
df: org.apache.spark.sql.DataFrame = [A: int, B: int ... 2 more fields]

scala> val df2 = df.select("A","B","C")
df2: org.apache.spark.sql.DataFrame = [A: int, B: int ... 1 more field]

scala> df.alias("t1").join(df2.alias("t2"),Seq("A"),"inner").show
+---+---+---+---+---+---+
|  A|  B|  C|  D|  B|  C|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  2|  3|
|  5|  4|  3|  1|  4|  3|
+---+---+---+---+---+---+


scala> df.alias("t1").join(df2.alias("t2"),Seq("A"),"inner").drop($"t2.B").drop($"t2.C").show
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  4|  3|  1|
+---+---+---+---+


scala>
Run Code Online (Sandbox Code Playgroud)

更新2:

要动态删除列,请检查以下解决方案。

scala> val df = Seq((1,2,3,4),(5,4,3,1)).toDF("A","B","C","D")
df: org.apache.spark.sql.DataFrame = [A: int, B: int ... 2 more fields]

scala> val df2 = Seq((1,9,9),(5,8,8)).toDF("A","B","C")
df2: org.apache.spark.sql.DataFrame = [A: int, B: int ... 1 more field]

scala> val df3 = df.alias("t1").join(df2.alias("t2"),Seq("A"),"inner")
df3: org.apache.spark.sql.DataFrame = [A: int, B: int ... 4 more fields]

scala> df3.show
+---+---+---+---+---+---+
|  A|  B|  C|  D|  B|  C|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  9|  9|
|  5|  4|  3|  1|  8|  8|
+---+---+---+---+---+---+

scala> val rem1 = Array("B","C")
rem1: Array[String] = Array(B, C)

scala> val rem2 = rem1.map(x=>"t2."+x)
rem2: Array[String] = Array(t2.B, t2.C)

scala> val df4 = rem2.foldLeft(df3) { (acc: DataFrame, colName: String) => acc.drop(col(colName)) }
df4: org.apache.spark.sql.DataFrame = [A: int, B: int ... 2 more fields]

scala>  df4.show
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  4|  3|  1|
+---+---+---+---+


scala>
Run Code Online (Sandbox Code Playgroud)

更新3

一次重命名/别名。

scala> val dfa = Seq((1,2,3,4),(5,4,3,1)).toDF("A","B","C","D")
dfa: org.apache.spark.sql.DataFrame = [A: int, B: int ... 2 more fields]

scala> val dfa2 = dfa.columns.foldLeft(dfa) { (acc: DataFrame, colName: String) => acc.withColumnRenamed(colName,colName+"_2")}
dfa2: org.apache.spark.sql.DataFrame = [A_2: int, B_2: int ... 2 more fields]

scala> dfa2.show
+---+---+---+---+
|A_2|B_2|C_2|D_2|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  4|  3|  1|
+---+---+---+---+


scala>
Run Code Online (Sandbox Code Playgroud)