Spark MLlib - trainImplicit warning

Tar*_*ula 14 python apache-spark pyspark apache-spark-mllib

我在使用时会看到这些警告trainImplicit:

WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB).
The maximum recommended task size is 100 KB.
Run Code Online (Sandbox Code Playgroud)

然后任务规模开始增加.我试图调用repartition输入RDD,但警告是相同的.

所有这些警告都来自ALS迭代,来自flatMap以及聚合,例如flatMap显示这些警告的阶段的起源(w/Spark 1.3.0,但它们也显示在Spark 1.3.1中):

org.apache.spark.rdd.RDD.flatMap(RDD.scala:296)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)
Run Code Online (Sandbox Code Playgroud)

从汇总:

org.apache.spark.rdd.RDD.aggregate(RDD.scala:968)
org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)
Run Code Online (Sandbox Code Playgroud)

Vit*_*t D 1

Apache Spark 邮件列表中描述了类似的问题 - http://apache-spark-user-list.1001560.n3.nabble.com/Large-Task-Size-td9539.html

我认为你可以尝试调整分区数量(使用repartition()方法),具体取决于你有多少主机、RAM、CPU。

还可以尝试通过 Web UI 调查所有步骤,您可以在其中查看阶段数量、每个阶段的内存使用情况以及数据位置。

或者,除非一切正常且快速,否则不要介意此警告。

此通知在 Spark 中硬编码 ( scheduler/TaskSetManager.scala )

      if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
          !emittedTaskSizeWarning) {
        emittedTaskSizeWarning = true
        logWarning(s"Stage ${task.stageId} contains a task of very large size " +
          s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
          s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
      }
Run Code Online (Sandbox Code Playgroud)

private[spark] object TaskSetManager {
  // The user will be warned if any stages contain a task that has a serialized size greater than
  // this.
  val TASK_SIZE_TO_WARN_KB = 100
} 
Run Code Online (Sandbox Code Playgroud)