如何处理spark sql中缺失的列

Vij*_*vva 2 scala apache-spark apache-spark-sql

我们正在处理无架构的 JSON 数据,有时 Spark 作业会失败,因为我们在 Spark SQL 中引用的某些列在一天中的某些时间不可用。在这些时间内,Spark 作业会失败,因为所引用的列在数据框中不可用。如何处理这种情况?我已经尝试过 UDF,但缺少太多列,因此无法真正检查每一列的可用性。我还尝试在更大的数据集上推断模式,并将其应用到数据帧上,期望缺失的列将用 null 填充,但模式应用程序失败并出现奇怪的错误。

请建议

小智 6

这对我有用。创建了一个函数来检查所有预期的列并将列添加到数据帧(如果缺少)

def checkAvailableColumns(df: DataFrame, expectedColumnsInput: List[String]) : DataFrame = {
    expectedColumnsInput.foldLeft(df) {
        (df,column) => {
            if(df.columns.contains(column) == false) {
                df.withColumn(column,lit(null).cast(StringType))
            }
            else (df)
        }
    }
}

val expectedColumns = List("newcol1","newcol2","newcol3")

val finalDf = checkAvailableColumns(castedDateSessions,expectedColumns)
Run Code Online (Sandbox Code Playgroud)