处理 Spark 中的架构不匹配

Ben*_*man 5 scala apache-spark

我正在 Scala 中使用 Spark 读取 csv 文件。该架构是预定义的,我正在使用它来阅读。这是示例代码:

// create the schema
val schema= StructType(Array(
      StructField("col1", IntegerType,false),
      StructField("col2", StringType,false),
      StructField("col3", StringType,true)))

// Initialize Spark session
val spark: SparkSession = SparkSession.builder
    .appName("Parquet Converter")
    .getOrCreate

// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)
Run Code Online (Sandbox Code Playgroud)

根据我在使用 Spark 使用模式阅读 cav 时所读到的内容,有 3 个选项:

  1. 将模式设置为DROPMALFORMED--> 这将删除与模式不匹配的行
  2. 将模式设置为PERMISSIVE--> 这会将整行设置为空值
  3. 将模式设置为FAILFAST--> 当发现不匹配时,这将引发异常

组合选项的最佳方式是什么?我想要的行为是获取架构中的不匹配项,将它们打印为错误并忽略数据框中的行。基本上,我想要 FAILFAST 和 DROPMALFORMED 的组合。

提前致谢

Ben*_*man 3

这就是我最终所做的:
我将“_corrupt_record”列添加到架构中,例如:

val schema= StructType(Array(
    StructField("col1", IntegerType,true),    
    StructField("col2", StringType,false),
    StructField("col3", StringType,true),
    StructField("_corrupt_record", StringType, true)))
Run Code Online (Sandbox Code Playgroud)

然后我使用 PERMISSIVE 模式读取 CSV(这是 Spark 默认模式):

val dataFrame: DataFrame = spark.read.format("csv")
                                .schema(schema)
                                .option("header", false)
                                .option("mode", "PERMISSIVE")
                                .load(inputCsvPath)
Run Code Online (Sandbox Code Playgroud)

现在,我的数据框包含一个附加列,其中包含架构不匹配的行。我过滤了数据不匹配的行并将其打印出来:

val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
Run Code Online (Sandbox Code Playgroud)