Spark sql 抛出 UTF8 字符串转换错误

Sur*_*aja 5 apache-spark apache-spark-sql

Spark sql 窗口功能似乎无法正常工作。我正在 Hadoop 集群中运行 Spark 作业,其中 HDFS 块大小为 128 MB,Spark 版本 1.5 CDH 5.5

我正在读取 avro 文件并执行以下操作

我的要求:

如果有多条记录具有相同的 data_rfe_id,则根据最大 seq_id 和最大 service_id 取单个记录

我看到在原始数据中有些记录具有相同的 data_rfe_id 和相同的 seq_id 因此,我使用 Window 函数应用 row_number ,以便我可以使用 row_num === 1 过滤记录

我只想使用窗口函数来实现这一点。

为什么会出现这样的情况呢?

在数据框上应用窗口函数之前是否需要重新洗牌?

它仅针对某些任务抛出以下异常,并且在 4 次重复失败的任务后作业失败?

我们什么时候会遇到这种异常。

 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.rowNumber
 .....

scala> df.printSchema
root
 |-- transitional_key: string (nullable = true)
 |-- seq_id: string (nullable = true)
 |-- data_rfe_id: string (nullable = true)
 |-- service_id: string (nullable = true)
 |-- event_start_date_time: string (nullable = true)
 |-- event_id: string (nullable = true)


 val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id").desc)
  val rankDF =df.withColumn("row_num",rowNumber.over(windowFunction))
  rankDF.select("data_rfe_id","seq_id","service_id","row_num").show(200,false)
Run Code Online (Sandbox Code Playgroud)

在我的代码中,我没有进行任何转换。只是将所有内容读取为字符串

当我在 Spark-shell 中运行上述代码时,我得到了正确的结果。

但是,如果我尝试通过提供 jar 来从 Spark-submit 命令运行相同的代码,则会引发以下异常

  Caused by: java.lang.ClassCastException:  org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
at org.apache.spark.sql.catalyst.expressions.JoinedRow.getInt(JoinedRow.scala:82)
at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:45)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:121)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:330)
at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

有人可以解释一下为什么我会收到上述错误吗?我该如何解决这个问题?

小智 -2

源中的数据可能已被修改,并且由于数据类型问题,它将失败。要解决此错误,您可以检查源中的数据并删除不必要的文件,它应该可以工作。