我有一个使用Apache Spark的Java程序.该程序中最有趣的部分如下所示:
long seed = System.nanoTime();
JavaRDD<AnnotatedDocument> annotated = documents
.mapPartitionsWithIndex(new InitialAnnotater(seed), true);
annotated.cache();
for (int iter = 0; iter < 2000; iter++) {
GlobalCounts counts = annotated
.mapPartitions(new GlobalCounter())
.reduce((a, b) -> a.sum(b)); // update overall counts (*)
seed = System.nanoTime();
// copy overall counts which CountChanger uses to compute a stochastic thing (**)
annotated = annotated
.mapPartitionsWithIndex(new CountChanger(counts, seed), true);
annotated.cache();
// adding these lines causes constant time complexity like i want
//List<AnnotatedDocument> ll = annotated.collect();
//annotated = sc.parallelize(ll, 8);
}
Run Code Online (Sandbox Code Playgroud)
因此,实际上,行(**)会导致RDD形式
documents
.mapPartitionsWithIndex(initial)
.mapPartitionsWithIndex(nextIter)
.mapPartitionsWithIndex(nextIter)
.mapPartitionsWithIndex(nextIter)
... 2000 more
Run Code Online (Sandbox Code Playgroud)
确实是一个很长的地图链.此外,由于需要更新计数,因此行(*)会在每次迭代时强制计算(非惰性).
我遇到的问题是,我得到的时间复杂度随着每次迭代而线性增加,因此总体上呈二次方式:
我认为这是因为Spark试图"记住"链中的每个RDD,以及容错算法或导致其增长的任何因素.但是,我真的不知道.
我真正想做的是在每次迭代时告诉Spark"崩溃"RDD,以便只有最后一个保存在内存中并继续工作.我认为这应该导致每次迭代的时间不变.这可能吗?还有其他解决方案吗?
谢谢!
尝试使用rdd.checkpoint.这将把RDD保存到hdfs并清除沿袭.
每次转换RDD时,都会增加谱系,Spark必须跟踪可用内容和必须重新计算的内容.处理DAG是昂贵的,并且大型DAG倾向于非常快地杀死性能.通过"检查点",您可以指示Spark计算并保存生成的RDD,并丢弃其创建方式的信息.这使得它类似于简单地保存RDD并将其读回,从而最大限度地减少DAG操作.
在旁注中,由于您遇到了这个问题,因此最好union通过添加来影响RDD性能,steps并且还可能StackOverflowError因为沿袭信息的方式而抛出.看这篇文章
这个链接有更多细节与漂亮的图表,这个主题也在这篇SO帖子中提到.
| 归档时间: |
|
| 查看次数: |
594 次 |
| 最近记录: |