Spark:作业卡在 100 的最后 2 个任务上

Geo*_* L. 6 bigdata apache-spark

我是 Spark 的新手,我必须支持由我们的顾问编写的应用程序。我阅读并观看了大量有关 Spark 的信息,但我仍然在为正确调整工作的小细节而苦苦挣扎。

场景:

  1. 包含 5 条清理规则的 Java 类,我们将这些规则应用于 4 亿条记录的 RDD。
  2. 分区数设置为 1000。
  3. 最后的“操作”是在 S3 上写入,但在此之前我们将分区数量减少到 100。
  4. Spark UI 显示进度,但不幸的是在保存的最后阶段,任务卡在 98/100
  5. 我没有使用 .collect() 但我使用 .map() 和 Spark SQL。

这是我用来编写的代码:

rdd.coalesce(100)
   .write().mode("append")
   .partitionBy("year", "month")
   .format(SPARK_AVRO_PACKAGE)
   .save(appProps.getProperty(PAGEVIEW_CLEANSED));
Run Code Online (Sandbox Code Playgroud)

我不确定是否应该努力改进代码或调整 spark/cluster 的性能。

更新:我认为这段代码是我遇到的问题的原因。我在 SO 上找到了一个类似的帖子(Spark 不会将负载平均分配给任务),我只是不确定如何在我的情况下使用广播。

Dataset<Row> duplicatePrefetchPrerenderHashDS = 
            hashedPageViewDS
              .select(hashedPageViewDS.col(PREFETCH_PRERENDER_HASH))
              .groupBy(hashedPageViewDS.col(PREFETCH_PRERENDER_HASH))
              .count()
              .withColumnRenamed("count", "cnt")
              .where("cnt>1");
Run Code Online (Sandbox Code Playgroud)

小智 0

您可以采取多种方法:

  1. 您可以尝试执行 allocateBy("year", "month") 这将确保只有 1 个分区写入每个文件夹。如果数据均匀分布在年份和月份中。

  2. 如果问题实际上是某些年份的偏差。然后我会说使用repartition(1000) 和distributeBy("year", "month","COL1")。在上面的示例中,COL1 将是几乎均匀分布的列,如 DAY of MONTH 或 DATE。现在,COL1 将决定不写入任何文件(30 表示有某天),而不是向每个分区写入 200 个(默认随机排序值)文件

  3. 另一件有用的事情是使用重新分区(100)而不是合并(100),因为重新分区将均匀分布数据,导致更多分区拥有输出中每个分区的数据。