使用Akka实现MapReduce

Sky*_*ker 5 java mapreduce akka

我正试图在Akka之上实现MapReduce,很幸运能找到Akka Essentials一书的代码.但是,我发现这个示例实现有两个主要问题,两者看起来都像基本的并发设计缺陷,在一本关于Akka的书中找到它是非常令人震惊的:

  1. 完成后,客户端将调用,shutdown()但此时无法保证消息通过WCMapReduceServer.我看到WCMapReduceServer在任何时候只获取部分数量的客户端消息,然后WCMapReduceServer输出[INFO] [06/25/2013 09:30:01.594] [WCMapReduceApp-5] [ActorSystem(WCMapReduceApp)] REMOTE: RemoteClientShutdown@akka://ClientApplication@192.168.224.65:2552意味着客户端shutdown()在客户端实际设法刷新所有未决消息之前发生.在客户端代码行41中,我们看到shutdown()发生时没有先刷新.在关闭系统之前,Akka是否有办法强制执行刷新出站消息?

  2. 另一个我已经修复过的更大的缺陷是用于向MapReduce服务器发出EOF信号的方式,主要任务(文字文件)完成后,给定所有子任务(文件的每一行)都完成.他发送一个特殊的字符串消息,DISPLAY_LIST并且此消息排队的优先级最低,请参阅代码.这里的一个重大缺陷是即使DISPLAY_LIST具有最低优先级,如果任何Map(或Reduce)任务任意长,则DISPLAY_LIST在所有MapReduce子任务完成之前消息将通过,因此该MapReduce示例的结果是非确定性的,即每次运行都可以得到不同的词典.可以通过替换MapActor#onReceive实现来揭示该问题 使用以下内容即使一个Map步骤任意长:

    public void onReceive(Object message) {
        System.out.println("MapActor -> onReceive(" + message + ")");
        if (message instanceof String) {
            String work = (String) message;
            // ******** BEGIN SLOW DOWN ONE MAP REQUEST
            if ("Thieves! thieves!".equals(work)) {
                try {
                    System.out.println("*** sleeping!");
                    Thread.sleep(5000);
                    System.out.println("*** back!");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // ******** END SLOW DOWN ONE MAP REQUEST
    
            // perform the work
            List<Result> list = evaluateExpression(work);
    
            // reply with the result
            actor.tell(list);
    
        } else throw new IllegalArgumentException("Unknown message [" + message + "]");
    }
    
    Run Code Online (Sandbox Code Playgroud)

进一步阅读这本书后发现:

我们有Thread.sleep()因为无法保证消息的处理顺序.第一个Thread.sleep()方法确保 在发送Result消息之前完全处理所有字符串句子消息.

对不起,我Thread.sleep()从来没有办法确保并发性.因此,难怪这样的书会在他们的例子中充满基本的并发缺陷.

Sky*_*ker 1

我已经解决了这两个问题,并且还将代码迁移到最新的Akka版本2.2-M3。

第一个问题的解决方案是让MapReduce远程MasterActor在收到所有消息发送完毕后从客户端发送的TaskInfo通知后立即发送ShutdownInfo通知。TaskInfo 包含MapReduce 任务有多少个子任务的信息,例如在本例中文本文件中有多少行。

第二个问题的解决方案是发送带有子任务总数的TaskInfo。这里,AggregatorActor 计算它已处理的子任务数量,将其与 TaskInfo 进行比较,并在它们匹配时发出信号表明作业已完成(当前仅打印一条消息)。

输出中显示了有趣且正确的行为:

  • ClientActor 发送一堆消息,这些消息是“子任务”。请注意,身份请求模式用于访问远程 MapReduce MasterActor 的 ActorRef。
  • ClientActor 最后发送 TaskInfo 消息,说明之前发送了多少个子任务。
  • MasterActor 将 String 消息转发给 MapActor,MapActor 又转发给 ReduceActor
  • 一个MapActor 是一个很长的MapActor,即内容为“小偷!小偷!”的MapActor。这会稍微减慢 MapReduce 计算速度。
  • 同时MasterActor接收TaskInfo最后一条消息并将ShudownInfo发送回ClientActor
  • ClientActor 运行system.shutdown()并且 Client 终止。请注意,MapReduce 仍在处理过程中,客户端关闭不会造成干扰。
  • 冗长的 MapActor 返回,消息处理继续。
  • AggregatorActor 接收 TaskInfo 并通过计算子任务来确认子任务总数已完成并发出完成信号。

代码可以从我的存储库中获取: https://github.com/bravegag/akka-mapreduce-example

随时欢迎反馈。