Spark:即使输出数据非常小,合并也很慢

Eda*_*ame 11 scala coalesce apache-spark

我在Spark中有以下代码:

myData.filter(t => t.getMyEnum() == null)
      .map(t => t.toString)
      .saveAsTextFile("myOutput")
Run Code Online (Sandbox Code Playgroud)

myOutput文件夹中有2000多个文件,但只有少数t.getMyEnum()== null,因此输出记录非常少.由于我不想只搜索2000+输出文件中的几个输出,我尝试使用coalesce组合输出,如下所示:

myData.filter(t => t.getMyEnum() == null)
      .map(t => t.toString)
      .coalesce(1, false)
      .saveAsTextFile("myOutput")
Run Code Online (Sandbox Code Playgroud)

然后工作变得极其缓慢!我想知道为什么这么慢?在2000多个分区中只有几个输出记录散布?有没有更好的方法来解决这个问题?

Zia*_*ani 15

如果你正在做一个激烈的合并,例如numPartitions = 1,这可能导致你的计算发生在比你想要的更少的节点上(例如,在numPartitions = 1的情况下,一个节点).为避免这种情况,您可以传递shuffle = true.这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么).

注意:使用shuffle = true,您实际上可以合并到更大数量的分区.如果您有少量分区(例如100),可能会使一些分区异常大,这很有用.调用coalesce(1000,shuffle = true)将导致1000个分区,并使用散列分区器分发数据.

所以尝试将true传递给coalesce函数.即

myData.filter(_.getMyEnum == null)
      .map(_.toString)
      .coalesce(1, shuffle = true)
      .saveAsTextFile("myOutput")
Run Code Online (Sandbox Code Playgroud)

  • `coalesce(1,shuffle = true)`相当于`repartition(1)`? (6认同)
  • 是的,是一样的:如果您查看源代码,则默认情况下repartition(1)的shuffle设置为true。 (3认同)
  • 谢谢你。它将我 10 分钟的任务优化为只有 1.8 (2认同)