Spark在stdout上丢失了println()

Eda*_*ame 17 scala println accumulator apache-spark

我有以下代码:

val blueCount = sc.accumulator[Long](0)
val output = input.map { data =>
  for (value <- data.getValues()) {
    if (record.getEnum() == DataEnum.BLUE) {
      blueCount += 1
      println("Enum = BLUE : " + value.toString()
    }
  }
  data
}.persist(StorageLevel.MEMORY_ONLY_SER)

output.saveAsTextFile("myOutput")
Run Code Online (Sandbox Code Playgroud)

然后blueCount不为零,但我没有println()输出!我在这里错过了什么吗?谢谢!

Alb*_*nto 18

这是一个概念性问题......

想象一下,你有一个庞大的集群,由许多工人组成,让我们说n工人和那些工人存储一个RDD或者的分区DataFrame,想象你map在这些数据中开始一个任务,并且在里面map你有一个print声明,首先:

  • 该数据将在何处打印出来?
  • 什么节点有优先级和什么分区?
  • 如果所有节点并行运行,谁将首先打印?
  • 如何创建此打印队列?

这些问题太多了,因此设计师/维护者在apache-spark逻辑上决定放弃对print任何map-reduce操作(包括accumulators甚至broadcast变量)内部语句的任何支持.

这也很有意义,因为Spark是一种为非常大的数据集而设计的语言.虽然打印对测试和调试很有用,但您不希望打印DataFrame或RDD的每一行,因为它们构建为具有数百万或数十亿行!那么,当你甚至不想首先打印时,为什么要处理这些复杂的问题呢?

为了证明这一点,您可以运行此scala代码,例如:

// Let's create a simple RDD
val rdd = sc.parallelize(1 to 10000)

def printStuff(x:Int):Int = {
  println(x)
  x + 1
}

// It doesn't print anything! because of a logic design limitation!
rdd.map(printStuff)

// But you can print the RDD by doing the following:
rdd.take(10).foreach(println)
Run Code Online (Sandbox Code Playgroud)

  • 我相信println工作正常:它只是运行一个spark执行器的计算机上的stdout/stderr.因此,除非您有办法捕获这些日志中的内容,否则您将永远不会看到它.如果你正在使用纱线,那么有一个命令可以为你打印出来. (12认同)