Ben*_*man 5 scala apache-spark
我正在 Scala 中使用 Spark 读取 csv 文件。该架构是预定义的,我正在使用它来阅读。这是示例代码:
// create the schema
val schema= StructType(Array(
StructField("col1", IntegerType,false),
StructField("col2", StringType,false),
StructField("col3", StringType,true)))
// Initialize Spark session
val spark: SparkSession = SparkSession.builder
.appName("Parquet Converter")
.getOrCreate
// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)
Run Code Online (Sandbox Code Playgroud)
根据我在使用 Spark 使用模式阅读 cav 时所读到的内容,有 3 个选项:
DROPMALFORMED--> 这将删除与模式不匹配的行PERMISSIVE--> 这会将整行设置为空值FAILFAST--> 当发现不匹配时,这将引发异常组合选项的最佳方式是什么?我想要的行为是获取架构中的不匹配项,将它们打印为错误并忽略数据框中的行。基本上,我想要 FAILFAST 和 DROPMALFORMED 的组合。
提前致谢
这就是我最终所做的:
我将“_corrupt_record”列添加到架构中,例如:
val schema= StructType(Array(
StructField("col1", IntegerType,true),
StructField("col2", StringType,false),
StructField("col3", StringType,true),
StructField("_corrupt_record", StringType, true)))
Run Code Online (Sandbox Code Playgroud)
然后我使用 PERMISSIVE 模式读取 CSV(这是 Spark 默认模式):
val dataFrame: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "PERMISSIVE")
.load(inputCsvPath)
Run Code Online (Sandbox Code Playgroud)
现在,我的数据框包含一个附加列,其中包含架构不匹配的行。我过滤了数据不匹配的行并将其打印出来:
val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7928 次 |
| 最近记录: |