对多列使用字符串索引器时无法执行用户定义的函数($anonfun$9: (string) => double)

Leo*_*orn 7 scala apache-spark apache-spark-mllib

我正在尝试在多列上应用字符串索引器。这是我的代码

val stringIndexers = Categorical_Model.map { colName =>new StringIndexer().setInputCol(colName).setOutputCol(colName + "_indexed")}

var dfStringIndexed = stringIndexers(0).fit(df3).transform(df3) // 'fit's a model then 'transform's data
for(x<-1 to stringIndexers.length-1)
{dfStringIndexed = stringIndexers(x).fit(dfStringIndexed).transform(dfStringIndexed)
}
dfStringIndexed = dfStringIndexed.drop(Categorical_Model: _*)
Run Code Online (Sandbox Code Playgroud)

Schema 显示所有可空的列都为 false

stringIndexers 数组显示如下

stringIndexers: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_c53c3bdf464c, strIdx_61e685c520f7, strIdx_d6e59b2fc69d, ......)


dfStringIndexed.show(10)
Run Code Online (Sandbox Code Playgroud)

这会引发以下错误

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$9: (string) =&gt; double)
Run Code Online (Sandbox Code Playgroud)

为什么显示打印模式但没有可用数据。

更新:如果我像这样手动循环字符串索引器而不是循环。此代码有效。这很奇怪。

var dfStringIndexed = stringIndexers(0).fit(df3).transform(df3) // 'fit's a model then 'transform's data
dfStringIndexed = stringIndexers(1).fit(dfStringIndexed).transform(dfStringIndexed)
dfStringIndexed = stringIndexers(2).fit(dfStringIndexed).transform(dfStringIndexed)
dfStringIndexed = stringIndexers(3).fit(dfStringIndexed).transform(dfStringIndexed)
dfStringIndexed = stringIndexers(4).fit(dfStringIndexed).transform(dfStringIndexed)
Run Code Online (Sandbox Code Playgroud)

根据要求添加 Stacktrace

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
  ... 63 elided
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$9: (string) => double)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  ... 3 more
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
  at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:251)
  at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:246)
  ... 19 more
Run Code Online (Sandbox Code Playgroud)

小智 6

我也遇到了类似的问题,即使是在 50 行的一小部分上,我是字符串索引的列中没有一个为空。但即使我手动运行它也不起作用。

我可以通过包含 .setHandleInvalid("keep") 来避免错误,并且我检查了输出,它没有做任何奇怪的事情,比如将所有内容设置为 0 或相同的值或其他任何内容。我仍然对这个决议不满意,因为它看起来很不安全。我很想知道您是否找到了更合理的答案和解决方案!

dfStringIndexed = stringIndexers(1).setHandleInvalid("keep").fit(dfStringIndexed).transform(dfStringIndexed)
Run Code Online (Sandbox Code Playgroud)

我认为它也可以通过更改列的可空性来修复,即使它不包含空值,我按照这里所做的

我可以更改 Spark 数据框中列的可为空性吗?