为什么foreach没有为驱动程序带来任何东西?

Kno*_*uch 13 apache-spark

我用spark shell编写了这个程序

val array = sc.parallelize(List(1, 2, 3, 4))
array.foreach(x => println(x))
Run Code Online (Sandbox Code Playgroud)

这打印一些调试语句,但不打印实际数字.

下面的代码工作正常

for(num <- array.take(4)) {
  println(num)
}
Run Code Online (Sandbox Code Playgroud)

我确实理解这take是一个动作,因此会引发火花触发懒惰的计算.

但是foreach应该以同样的方式工作......为什么没有foreach从spark中带回任何东西并开始进行实际处理(退出懒惰模式)

我怎样才能在rdd上工作?

Nig*_*olf 37

RDD.foreachSpark中的方法在集群上运行,因此包含这些记录的每个worker都在运行操作foreach.即你的代码正在运行,但它们是在Spark worker stdout上打印出来的,而不是在驱动程序/你的shell会话中打印出来的.如果查看Spark worker的输出(stdout),您将看到这些打印到控制台.

您可以通过转到为每个正在运行的执行程序运行的web gui来查看worker上的stdout.示例URL为http:// workerIp:workerPort/logPage /?appId = app-20150303023103-0043&executorId = 1&logType = stdout

Spark Executor Stdout

在此示例中,Spark选择将RDD的所有记录放在同一分区中.

如果您考虑它,这是有道理的 - 查看函数签名foreach- 它不会返回任何内容.

/**
 * Applies a function f to all elements of this RDD.
 */
def foreach(f: T => Unit): Unit
Run Code Online (Sandbox Code Playgroud)

这实际上是foreachscala 的目的- 它用于副作用.

当您收集记录时,将它们带回驱动程序,因此逻辑上收集/执行操作只是在Spark驱动程序中的Scala集合上运行 - 您可以看到日志输出,因为spark驱动程序/ spark shell是什么打印到stdout中会话.

foreach的用例可能看起来不是很明显,例如 - 如果RDD中的每个记录都要做一些外部行为,比如调用REST api,你可以在foreach中执行此操作,然后每个Spark工作者都会提交一个使用值调用API服务器.如果foreach确实带回了记录,你可以很容易地在驱动程序/ shell进程中烧掉内存.这样可以避免这些问题,并且可以对群集中RDD中的所有项执行副作用.

如果你想看看我使用的RDD中有什么;

array.collect.foreach(println) 
//Instead of collect, use take(...) or takeSample(...) if the RDD is large
Run Code Online (Sandbox Code Playgroud)


Mar*_*cok 5

您可以使用 RDD.toLocalIterator() 将数据传送到驱动程序(一次一个 RDD 分区):

val array = sc.parallelize(List(1, 2, 3, 4))
for(rec <- array.toLocalIterator) { println(rec) }
Run Code Online (Sandbox Code Playgroud)

也可以看看