RSG*_*RSG 0 hive scala apache-spark apache-spark-sql
为了遍历从 Hive 表创建的 Spark Dataframe 的列并更新所有出现的所需列值,我尝试了以下代码。
import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
val a: DataFrame = spark.sql(s"select * from default.table_a")
val column_names: Array[String] = a.columns
val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date"))
val func = udf((value: String) => { if if (value == "XXXX" || value == "WWWW" || value == "TTTT") "NULL" else value } )
val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
Run Code Online (Sandbox Code Playgroud)
在 spark shell 中执行代码时,出现以下错误。
scala> val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
<console>:35: error: value a is not a member of org.apache.spark.sql.DataFrame
val b = {for (column: String <- required_column_list) { a.withColumn(column , isNull(a(column))) } a}
^
Run Code Online (Sandbox Code Playgroud)
我也尝试了以下语句,但没有得到所需的输出。
val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }
Run Code Online (Sandbox Code Playgroud)
变量 b 被创建为一个 Unit 而不是 Dataframe。
scala> val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }
b: Unit = ()
Run Code Online (Sandbox Code Playgroud)
请建议任何更好的方法来遍历 Dataframe 的列并更新列中所有出现的值或纠正我错误的地方。任何其他解决方案也受到赞赏。提前致谢。
而不是for 循环,您应该使用foldLeft. 而且你不需要udf函数,可以使用when 内置函数
val column_names: Array[String] = a.columns
val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date"))
import org.apache.spark.sql.functions._
val b = required_columns.foldLeft(a){(tempdf, colName) => tempdf.withColumn(colName, when(col(colName) === "XXX" || col(colName) === "WWWW" || col(colName) === "TTTT", "NULL").otherwise(col(colName)))}
Run Code Online (Sandbox Code Playgroud)
我希望答案有帮助
在
required_columns.foldLeft(a){(tempdf, colName) => tempdf.withColumn(colName, when(col(colName) === "XXX" || col(colName) === "WWWW" || col(colName) === "TTTT", "NULL").otherwise(col(colName)))}
required_columns是来自a数据框/数据集的列名数组,_date作为结束字符串,它们是colName内部withColumn
tempdf 是原始数据框/数据集,即 a
当在内部应用函数时withColumn,将所有XXX或WWWWW或TTTT值替换为NULL
最后foldLeft将所有转换应用的数据帧返回到b