Ath*_*kur 2 scala apache-spark apache-spark-sql
我试图读取Spark Scala中的大文件,然后尝试执行join。当我使用较小的文件进行测试时,效果很好,但是对于较大的文件,我得到了一些低于错误的时间。
我设法取出其中一个出错的文件。文件大小为1 GB,并且在最后创建分区时,在我分割文件名以获取列的地方抛出此错误。
就在这条线之后
val rdd = sc.textFile(mainFileURL)
val header = rdd.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
println(schema)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
Run Code Online (Sandbox Code Playgroud)
This is the culprit line
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
Run Code Online (Sandbox Code Playgroud)
Please suggest how can I handle this.
When I do rdd.count I get value . But when I do data.count() I get the error
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 37
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, uniqueFundamentalSet), StringType), true) AS uniqueFundamentalSet#0
I
Run Code Online (Sandbox Code Playgroud)
Here is my sample data set
val rdd = sc.textFile(mainFileURL)
val header = rdd.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
println(schema)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
Run Code Online (Sandbox Code Playgroud)
筛选出不匹配的行
其中一种最简单的方法是过滤出所有不施加模式之前的模式的长度相匹配的行,以形成一个数据帧作为
val requiredNumberOfFields = schema.fieldNames.length //added to take the number of columns required
val data = sqlContext
.createDataFrame(
rdd
.filter(!_.contains("uniqueFundamentalSet"))
.map(line => line.split("\\|\\^\\|"))
.filter(_.length == requiredNumberOfFields) //added to filter in only the rows which has the same number of fields required in schema
.map(x => Row.fromSeq(x.toSeq))
, schema)
Run Code Online (Sandbox Code Playgroud)
添加虚拟字符串或过滤掉多余的字符串
您可以编写一个函数来检查长度。如果数据长度小于架构,则可以添加伪字符串。如果数据长度更多,则可以删除多余的数据
val requiredNumberOfFields = schema.fieldNames.length
def appendDummyData(row: Array[String], len: Int) = row.length == len match {
case true => row
case false => if(len > row.length) {
val add = (for(loop <- 1 to len - row.length) yield "dummy").toArray
row ++ add
} else row.take(len)
}
val data = sqlContext
.createDataFrame(
rdd
.filter(!_.contains("uniqueFundamentalSet"))
.map(line => line.split("\\|\\^\\|"))
.map(x => Row.fromSeq(appendDummyData(x, requiredNumberOfFields).toSeq)) //calling the custom function for checking the length
, schema)
Run Code Online (Sandbox Code Playgroud)
我希望答案是有帮助的