blu*_*lds 8 csv scala row decode apache-spark
我正在尝试将一些csv数据加载到spark集群中并对其运行一些查询,但是我遇到了加载数据的问题.
请参阅下面的代码示例 - 我已经生成了一个标头,并且我正在尝试解析列,但是当使用模糊的错误消息针对(大的,富的列)数据集运行时,该过程失败:'java.lang.String不是字符串模式的有效外部类型'
这似乎没有在互联网上的其他地方解决 - 任何人都知道问题可能是什么?
(我原本以为这可能与正在加载的空字段或空字段有关,但是一段时间后进程失败,并且源数据非常稀疏)
var headers = StructType(header_clean.split(",").map(fieldName ? StructField(fieldName, StringType, true)))
var contentRdd = contentNoHeader.map(k => k.split(",")).map(
p => Row(p.map( x => x.replace("\"", "").trim)))
contentRdd.createOrReplaceTempView("someView")
val domains = spark.sql("SELECT DISTINCT domain FROM someView")
Run Code Online (Sandbox Code Playgroud)
作为参考,错误日志的底部(非常垃圾,很多列
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true) AS pageUrl#377
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] :
+- 87 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true)
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true] at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) ... 3 more Caused by: java.lang.RuntimeException: [Ljava.lang.String; is not a valid external type for schema of string at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276) ... 17 more
Run Code Online (Sandbox Code Playgroud)
小智 1
我通过拆分 Row 的元素解决了这个问题。你可以这样做:
\n\nStructType(header_clean.split(",").map(fieldName \xe2\x87\x92StructField(fieldName, StringType, true)))\nvar contentRdd = contentNoHeader.map(k => k.split(",")).map(\np => {\n val ppp = p.map( x => x.replace("\\"", "").trim)\n Row(ppp(0),ppp(1),ppp(2))\n})\n
Run Code Online (Sandbox Code Playgroud)\n
归档时间: |
|
查看次数: |
6486 次 |
最近记录: |