我用spark shell编写了这个程序
val array = sc.parallelize(List(1, 2, 3, 4))
array.foreach(x => println(x))
Run Code Online (Sandbox Code Playgroud)
这打印一些调试语句,但不打印实际数字.
下面的代码工作正常
for(num <- array.take(4)) {
println(num)
}
Run Code Online (Sandbox Code Playgroud)
我确实理解这take是一个动作,因此会引发火花触发懒惰的计算.
但是foreach应该以同样的方式工作......为什么没有foreach从spark中带回任何东西并开始进行实际处理(退出懒惰模式)
我怎样才能在rdd上工作?
Nig*_*olf 37
RDD.foreachSpark中的方法在集群上运行,因此包含这些记录的每个worker都在运行操作foreach.即你的代码正在运行,但它们是在Spark worker stdout上打印出来的,而不是在驱动程序/你的shell会话中打印出来的.如果查看Spark worker的输出(stdout),您将看到这些打印到控制台.
您可以通过转到为每个正在运行的执行程序运行的web gui来查看worker上的stdout.示例URL为http:// workerIp:workerPort/logPage /?appId = app-20150303023103-0043&executorId = 1&logType = stdout

在此示例中,Spark选择将RDD的所有记录放在同一分区中.
如果您考虑它,这是有道理的 - 查看函数签名foreach- 它不会返回任何内容.
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit
Run Code Online (Sandbox Code Playgroud)
这实际上是foreachscala 的目的- 它用于副作用.
当您收集记录时,将它们带回驱动程序,因此逻辑上收集/执行操作只是在Spark驱动程序中的Scala集合上运行 - 您可以看到日志输出,因为spark驱动程序/ spark shell是什么打印到stdout中会话.
foreach的用例可能看起来不是很明显,例如 - 如果RDD中的每个记录都要做一些外部行为,比如调用REST api,你可以在foreach中执行此操作,然后每个Spark工作者都会提交一个使用值调用API服务器.如果foreach确实带回了记录,你可以很容易地在驱动程序/ shell进程中烧掉内存.这样可以避免这些问题,并且可以对群集中RDD中的所有项执行副作用.
如果你想看看我使用的RDD中有什么;
array.collect.foreach(println)
//Instead of collect, use take(...) or takeSample(...) if the RDD is large
Run Code Online (Sandbox Code Playgroud)
您可以使用 RDD.toLocalIterator() 将数据传送到驱动程序(一次一个 RDD 分区):
val array = sc.parallelize(List(1, 2, 3, 4))
for(rec <- array.toLocalIterator) { println(rec) }
Run Code Online (Sandbox Code Playgroud)
也可以看看
| 归档时间: |
|
| 查看次数: |
12359 次 |
| 最近记录: |