我的程序遵循迭代的map/reduce方法.如果满足某些条件,它需要停止.无论如何,我可以设置一个全局变量,可以分布在所有map/reduce任务中,并检查全局变量是否达到完成条件.
像这样的东西.
While(Condition != true){
Configuration conf = getConf();
Job job = new Job(conf, "Dijkstra Graph Search");
job.setJarByClass(GraphSearch.class);
job.setMapperClass(DijkstraMap.class);
job.setReducerClass(DijkstraReduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
}
Run Code Online (Sandbox Code Playgroud)
where条件是在每个map/reduce执行期间/之后修改的全局变量.
每次运行map-reduce作业时,都可以检查输出的状态,计数器中包含的值等,并在控制迭代的节点上决定是否要再进行一次迭代.我想我不明白在你的场景中对全局状态的需求来自何处.
更一般地说 - 在执行节点之间共享状态有两种主要方式(尽管应该注意,最好避免共享状态,因为它限制了可伸缩性).
您可以使用Configuration.set(String name,String value)设置您可以在Mappers/Reducers/etc中访问的值:
在你的司机:
conf.set("my.dijkstra.parameter", "value");
Run Code Online (Sandbox Code Playgroud)
例如在你的映射器中:
public void configure(JobConf job) {
myParam = job.get("my.dijkstra.parameter");
}
Run Code Online (Sandbox Code Playgroud)
但这不太可能帮助您查看以前作业的输出,以决定是否再开始一次迭代.也就是说,在执行作业后,该值不会被推回.
您还可以使用Hadoop的DistributedCache来存储将在所有节点之间分发的文件.如果你要通过这种方式传递的值很小,这比仅仅在HDFS上存储一些东西要好一些.
当然,计数器也可用于此目的.但是,为了在算法中做出决策,它们看起来不太可靠.看起来在某些情况下,它们可以递增两次(如果某个任务执行了一次以上,例如在失败或投机执行的情况下) - 我不确定.