Man*_*ake 1 dataframe apache-spark
我有以下输入文件,可能有错误记录,我想抛出异常并识别列名列不符合我的自定义架构。根据我的理解,即使我们不对数据帧调用任何操作,数据帧也应该立即抛出异常。
1,a,10000,11-03-2019,浦那
2,b,10020,14-03-2019,浦那
3,a,34567,15-03-2019,浦那
tui,a,fgh-03-2019,浦那
4,b,10020,14-03-2019,浦那
我尝试为 Spark 数据框设置“FAILFAST”选项,但它并没有在我的末尾引发任何类型的异常。
我已经尝试过下面的代码。
SparkSession ss = SparkSession.builder().appName("Data Quality Frameowrk")
.master("local")
.getOrCreate();
try {
StructField[] fields = new StructField[5];
fields[0] = new StructField("id", DataTypes.IntegerType, false,Metadata.empty());
fields[1] = new StructField("name", DataTypes.StringType, false,Metadata.empty());
fields[2] = new StructField("salary", DataTypes.DoubleType, false,Metadata.empty());
fields[3] = new StructField("dob", DataTypes.DateType, false,Metadata.empty());
fields[4] = new StructField("loc", DataTypes.StringType, false,Metadata.empty());
StructType customSchema = new StructType(fields);
ss.read().format("csv")
.schema(customSchema)
.option("mode", "FAILFAST")
.load("C:\\\\Users\\\\manoj.dhake\\\\Downloads\\\\softwares\\\\neo4jdata\\\\employee.csv");
}catch(Exception e) {
System.out.println("want to catch column name ,due to which error has been occured");
e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)
注意:程序应该能够在数据类型不匹配的情况下捕获列名,并进一步继续执行流程(不应异常终止)。
这是因为Spark是惰性的,它在调用时甚至不读取数据load,只有处理数据帧才会触发实际的读取。根据文件
FAILFAST :遇到损坏的记录时抛出异常。
所以它与使负载急切无关。可以通过手动触发处理来急切地完成验证,但如果所有条目都有效,这将导致所有数据被处理两次。使用以下方法可以在一定程度上减轻性能影响cache:
val df = spark.read
.schema(StructType(Seq(StructField("test", IntegerType))))
.option("mode", "FAILFAST")
.csv(Seq("a").toDS())
.cache()
df.count()
Run Code Online (Sandbox Code Playgroud)
会扔
aorg.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
Run Code Online (Sandbox Code Playgroud)