Sky*_*ker 5 java mapreduce akka
我正试图在Akka之上实现MapReduce,很幸运能找到Akka Essentials一书的代码.但是,我发现这个示例实现有两个主要问题,两者看起来都像基本的并发设计缺陷,在一本关于Akka的书中找到它是非常令人震惊的:
完成后,客户端将调用,shutdown()但此时无法保证消息通过WCMapReduceServer.我看到WCMapReduceServer在任何时候只获取部分数量的客户端消息,然后WCMapReduceServer输出[INFO] [06/25/2013 09:30:01.594] [WCMapReduceApp-5] [ActorSystem(WCMapReduceApp)] REMOTE: RemoteClientShutdown@akka://ClientApplication@192.168.224.65:2552意味着客户端shutdown()在客户端实际设法刷新所有未决消息之前发生.在客户端代码行41中,我们看到shutdown()发生时没有先刷新.在关闭系统之前,Akka是否有办法强制执行刷新出站消息?
另一个我已经修复过的更大的缺陷是用于向MapReduce服务器发出EOF信号的方式,主要任务(文字文件)完成后,给定所有子任务(文件的每一行)都完成.他发送一个特殊的字符串消息,DISPLAY_LIST并且此消息排队的优先级最低,请参阅代码.这里的一个重大缺陷是即使DISPLAY_LIST具有最低优先级,如果任何Map(或Reduce)任务任意长,则DISPLAY_LIST在所有MapReduce子任务完成之前消息将通过,因此该MapReduce示例的结果是非确定性的,即每次运行都可以得到不同的词典.可以通过替换MapActor#onReceive实现来揭示该问题 使用以下内容即使一个Map步骤任意长:
public void onReceive(Object message) {
System.out.println("MapActor -> onReceive(" + message + ")");
if (message instanceof String) {
String work = (String) message;
// ******** BEGIN SLOW DOWN ONE MAP REQUEST
if ("Thieves! thieves!".equals(work)) {
try {
System.out.println("*** sleeping!");
Thread.sleep(5000);
System.out.println("*** back!");
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
// ******** END SLOW DOWN ONE MAP REQUEST
// perform the work
List<Result> list = evaluateExpression(work);
// reply with the result
actor.tell(list);
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}
Run Code Online (Sandbox Code Playgroud)进一步阅读这本书后发现:
我们有Thread.sleep()因为无法保证消息的处理顺序.第一个Thread.sleep()方法确保 在发送Result消息之前完全处理所有字符串句子消息.
对不起,我Thread.sleep()从来没有办法确保并发性.因此,难怪这样的书会在他们的例子中充满基本的并发缺陷.
我已经解决了这两个问题,并且还将代码迁移到最新的Akka版本2.2-M3。
第一个问题的解决方案是让MapReduce远程MasterActor在收到所有消息发送完毕后从客户端发送的TaskInfo通知后立即发送ShutdownInfo通知。TaskInfo 包含MapReduce 任务有多少个子任务的信息,例如在本例中文本文件中有多少行。
第二个问题的解决方案是发送带有子任务总数的TaskInfo。这里,AggregatorActor 计算它已处理的子任务数量,将其与 TaskInfo 进行比较,并在它们匹配时发出信号表明作业已完成(当前仅打印一条消息)。
输出中显示了有趣且正确的行为:
system.shutdown()并且 Client 终止。请注意,MapReduce 仍在处理过程中,客户端关闭不会造成干扰。代码可以从我的存储库中获取: https://github.com/bravegag/akka-mapreduce-example
随时欢迎反馈。