Yas*_*ava 6 scala apache-spark
简要问题:
对于更直接的查询,我想依次遍历所有行,并根据特定行的某些条件为某些变量(a,b,c)分配一些值,然后我将分配其中1个的值变量放入该特定行的一列中。
详细说明:
我想在Spark中更新数据框中的列值。更新将是有条件的,其中我将在行上运行循环并根据该行其他列的值更新一列。
我尝试使用withColumn方法,但出现错误。请提出任何其他方法。withColumn方法的解析也将有很大帮助。
表:
var table1 = Seq((11, 25, 2, 0), (42, 20, 10, 0)).toDF("col_1", "col_2", "col_3", "col_4")
table1.show()
Run Code Online (Sandbox Code Playgroud)
架构:
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 11| 25| 2| 0|
| 42| 20| 10| 0|
+-----+-----+-----+-----+
Run Code Online (Sandbox Code Playgroud)
我在这里尝试了2种方法:
在下面的代码中,根据条件,仅需要以这种方式放置在不同位置初始化的变量
代码:
for(i <- table1.rdd.collect()) {
if(i.getAs[Int]("col_1") > 0) {
var adj_a = 0
var adj_c = 0
if(i.getAs[Int]("col_1") > (i.getAs[Int]("col_2") + i.getAs[Int]("col_3"))) {
if(i.getAs[Int]("col_1") < i.getAs[Int]("col_2")) {
adj_a = 10
adj_c = 2
}
else {
adj_a = 5
}
}
else {
adj_c = 1
}
adj_c = adj_c + i.getAs[Int]("col_2")
table1.withColumn("col_4", adj_c)
//i("col_4") = adj_c
}
}
Run Code Online (Sandbox Code Playgroud)
第一种情况下的错误:
table1.withColumn(“ col_4”,adj_c)
<console>:80: error: type mismatch;
found : Int
required: org.apache.spark.sql.Column
table1.withColumn("col_4", adj_c)
^
Run Code Online (Sandbox Code Playgroud)
我也在这里尝试使用col(adj_c),但是它开始失败
<console>:80: error: type mismatch;
found : Int
required: String
table1.withColumn("col_4", col(adj_c))
^
Run Code Online (Sandbox Code Playgroud)
第2种情况下的错误:
(i(“ col_4”)= adj_c)
<console>:81: error: value update is not a member of org.apache.spark.sql.Row
i("col_4") = adj_c
^
Run Code Online (Sandbox Code Playgroud)
我希望输出表是:
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 11| 25| 2| 1|
| 42| 20| 10| 5|
+-----+-----+-----+-----+
Run Code Online (Sandbox Code Playgroud)
请提出可能的解决方案,如有疑问,请回复。
由于存在问题,请帮助我。任何建议都会非常有帮助。
您应该使用when
函数而不是这种复杂的语法,也不需要显式循环,Spark 会自行处理。当您执行 a 时,withColumn
它会应用于每一行
table1.withColumn("col_4", when($"col_1" > $"col_2" + $"col_3", 5).otherwise(1)).show
Run Code Online (Sandbox Code Playgroud)
快速测试:
输入
table1.show
-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 11| 25| 2| 0|
| 42| 20| 10| 0|
+-----+-----+-----+-----+
Run Code Online (Sandbox Code Playgroud)
输出
table1.withColumn("col_4", when($"col_1" > $"col_2" + $"col_3", lit(5)).otherwise(1)).show
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
| 11| 25| 2| 1|
| 42| 20| 10| 5|
+-----+-----+-----+-----+
Run Code Online (Sandbox Code Playgroud)
UDF 可以与任何自定义逻辑一起使用以计算列值,例如:
val calculateCol4 = (col_1:Int, col_2:Int, col_3:Int) =>
if (col_1 > 0) {
var adj_a = 0
var adj_c = 0
if (col_1 > col_2 + col_3) {
if (col_1 < col_2) {
adj_a = 10
adj_c = 2
}
else {
adj_a = 5
}
}
else {
adj_c = 1
}
println("adj_c: "+adj_c)
adj_c = adj_c + col_2
// added for return correct result
adj_c
}
// added for return correct result
else 0
val col4UDF = udf(calculateCol4)
table1.withColumn("col_4",col4UDF($"col_1", $"col_2", $"col_3"))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
552 次 |
最近记录: |