使用 Akka Actor 进行文件操作

bum*_*bee 2 java actor akka

使用 Akka Actor 与普通文件操作方法相比有什么优势?我试图计算分析日志文件所需的时间。操作是查找登录次数超过50次的IP地址并显示出来。与 Akka Actor 模型相比,普通文件操作速度更快。为什么这样?

使用普通文件操作

public static void main(String[] args) {
        // TODO Auto-generated method stub
        //long startTime = System.currentTimeMillis();
        File file = new File("log.txt");
        Map<String, Long> ipMap = new HashMap<>();

        try {

                FileReader fr = new FileReader(file);
                BufferedReader br = new BufferedReader(fr);
                String line = br.readLine();

                while(line!=null) {
                    int idx = line.indexOf('-');
                    String ipAddress = line.substring(0, idx).trim();
                    long count = ipMap.getOrDefault(ipAddress, 0L);
                    ipMap.put(ipAddress, ++count);
                    line = br.readLine();
                }

                 System.out.println("================================");
                 System.out.println("||\tCount\t||\t\tIP");
                 System.out.println("================================");

                 fr.close();
                 br.close();
                 Map<String, Long> result = new HashMap<>();

                    // Sort by value and put it into the "result" map
                    ipMap.entrySet().stream()
                            .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
                            .forEachOrdered(x -> result.put(x.getKey(), x.getValue()));

                    // Print only if count > 50
                    result.entrySet().stream().filter(entry -> entry.getValue() > 50).forEach(entry ->
                        System.out.println("||\t" + entry.getValue() + "   \t||\t" + entry.getKey())
                    );

//                  long endTime = System.currentTimeMillis();
//                  System.out.println("Time: "+(endTime-startTime));

            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

    }

Using Actors:
1. The Main Class
 public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        // Create actorSystem
        ActorSystem akkaSystem = ActorSystem.create("akkaSystem");

        // Create first actor based on the specified class
        ActorRef coordinator = akkaSystem.actorOf(Props.create(FileAnalysisActor.class));

        // Create a message including the file path
        FileAnalysisMessage msg = new FileAnalysisMessage("log.txt");

        // Send a message to start processing the file. This is a synchronous call using 'ask' with a timeout.
        Timeout timeout = new Timeout(6, TimeUnit.SECONDS);
        Future<Object> future = Patterns.ask(coordinator, msg, timeout);

        // Process the results
        final ExecutionContext ec = akkaSystem.dispatcher();
        future.onSuccess(new OnSuccess<Object>() {
            @Override
            public void onSuccess(Object message) throws Throwable {
                if (message instanceof FileProcessedMessage) {
                    printResults((FileProcessedMessage) message);

                    // Stop the actor system
                    akkaSystem.shutdown();
                }
            }

            private void printResults(FileProcessedMessage message) {
                System.out.println("================================");
                System.out.println("||\tCount\t||\t\tIP");
                System.out.println("================================");

                Map<String, Long> result = new LinkedHashMap<>();

                // Sort by value and put it into the "result" map
                message.getData().entrySet().stream()
                        .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
                        .forEachOrdered(x -> result.put(x.getKey(), x.getValue())); 

                // Print only if count > 50
                result.entrySet().stream().filter(entry -> entry.getValue() > 50).forEach(entry ->
                    System.out.println("||\t" + entry.getValue() + "   \t||\t" + entry.getKey())
                );
                long endTime = System.currentTimeMillis();
                System.out.println("Total time: "+(endTime - startTime));
            }

        }, ec);

    }
Run Code Online (Sandbox Code Playgroud)

2.文件分析器类

public class FileAnalysisActor extends UntypedActor {

    private Map<String, Long> ipMap = new HashMap<>();
    private long fileLineCount;
    private long processedCount;
    private ActorRef analyticsSender = null;

    @Override
    public void onReceive(Object message) throws Exception {
        /*
            This actor can receive two different messages, FileAnalysisMessage or LineProcessingResult, any
            other type will be discarded using the unhandled method
         */
            //System.out.println(Thread.currentThread().getName());
        if (message instanceof FileAnalysisMessage) {

            List<String> lines = FileUtils.readLines(new File(
                    ((FileAnalysisMessage) message).getFileName()));

            fileLineCount = lines.size();
            processedCount = 0;

            // stores a reference to the original sender to send back the results later on
            analyticsSender = this.getSender();

            for (String line : lines) {
                // creates a new actor per each line of the log file
                Props props = Props.create(LogLineProcessor.class);
                ActorRef lineProcessorActor = this.getContext().actorOf(props);

                // sends a message to the new actor with the line payload
                lineProcessorActor.tell(new LogLineMessage(line), this.getSelf());
            }

        } else if (message instanceof LineProcessingResult) {

            // a result message is received after a LogLineProcessor actor has finished processing a line
            String ip = ((LineProcessingResult) message).getIpAddress();

            // increment ip counter
            Long count = ipMap.getOrDefault(ip, 0L);
            ipMap.put(ip, ++count);

            // if the file has been processed entirely, send a termination message to the main actor
            processedCount++;
            if (fileLineCount == processedCount) {
                // send done message
                analyticsSender.tell(new FileProcessedMessage(ipMap), ActorRef.noSender());
            }

        } else {
            // Ignore message
            this.unhandled(message);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

3.Logline处理器类

public class LogLineProcessor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof LogLineMessage) {
            // What data each actor process?
            //System.out.println("Line: " + ((LogLineMessage) message).getData());
            // Uncomment this line to see the thread number and the actor name relationship
           //System.out.println("Thread ["+Thread.currentThread().getId()+"] handling ["+ getSelf().toString()+"]");

            // get the message payload, this will be just one line from the log file
            String messageData = ((LogLineMessage) message).getData();

            int idx = messageData.indexOf('-');
            if (idx != -1) {
                // get the ip address
                String ipAddress = messageData.substring(0, idx).trim();

                // tell the sender that we got a result using a new type of message
                this.getSender().tell(new LineProcessingResult(ipAddress), this.getSelf());
            }
        } else {
            // ignore any other message type
            this.unhandled(message);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

消息类

  1. 文件分析消息

    公共类 FileAnalysisMessage {

    private String fileName;
    
    public FileAnalysisMessage(String file) {
        this.fileName = file;
    }
    
    public String getFileName() {
        return fileName;
    }
    
    Run Code Online (Sandbox Code Playgroud)

    }

2.文件处理消息

public class FileProcessedMessage {

    private Map<String, Long> data;

    public FileProcessedMessage(Map<String, Long> data) {
        this.data = data;
    }

    public Map<String, Long> getData() {
        return data;
    }
}
Run Code Online (Sandbox Code Playgroud)
  1. 线处理结果

    公共类 LineProcessingResult {

    private String ipAddress;
    
    public LineProcessingResult(String ipAddress) {
        this.ipAddress = ipAddress;
    }
    
    public String getIpAddress() {
        return ipAddress;
    }
    
    Run Code Online (Sandbox Code Playgroud)

    }

4.Logline消息

public class LogLineMessage {

    private String data;

    public LogLineMessage(String data) {
        this.data = data;
    }

    public String getData() {
        return data;
    }
}
Run Code Online (Sandbox Code Playgroud)

我正在为文件中的每一行创建一个演员。

Ram*_*gil 6

对于所有并发框架,在部署的并发量与每个并发单元所涉及的复杂性之间总是存在权衡。阿卡也不例外。

在您的非 akka 方法中,每行都有一个相对简单的步骤序列:

  1. 从文件中读取一行
  2. 用'-'分割线
  3. 将 IP 地址提交到哈希图中并增加计数

相比之下,每一行的 akka 方法要复杂得多:

  1. 创建一个 Actor
  2. 创建LogLineMessage消息
  3. 将消息发送给演员
  4. 用'-'分割线
  5. 创建LineProcessingResult消息
  6. 将消息发送回协调参与者
  7. 将 IP 地址提交到哈希图中并增加计数

如果我们天真地假设上述每个步骤都花费相同的时间,那么您将需要 2 个带有 akka 的线程才能以与没有 akka 的 1 个线程相同的速度运行。

让每个并发单元做更多的工作

不是Actor每 1 行 1 行,而是让每个 Actor 将 N 行处理到自己的子哈希图中(例如,每个 Actor 处理 1000 行):

public class LogLineMessage {

    private String[] data;

    public LogLineMessage(String[] data) {
        this.data = data;
    }

    public String[] getData() {
        return data;
    }
}
Run Code Online (Sandbox Code Playgroud)

那么 Actor 就不会发回像 IP 地址这样简单的东西了。相反,它将为其行子集发送计数的散列:

public class LineProcessingResult {

    private HashMap<String, Long> ipAddressCount;

    public LineProcessingResult(HashMap<String, Long> count) {
        this.ipAddressCount = Count;
    }

    public HashMap<String, Long> getIpAddress() {
        return ipAddressCount;
    }
}
Run Code Online (Sandbox Code Playgroud)

协调 Actor 可以负责组合所有不同的子计数:

//inside of FileAnalysisActor
else if (message instanceof LineProcessingResult) {
    HashMap<String,Long>  localCount = ((LineProcessingResult) message).getIpAddressCount();

    localCount.foreach((ipAddress, count) -> {
        ipMap.put(ipAddress, ipMap.getOrDefault(ipAddress, 0L) + count);
    })
Run Code Online (Sandbox Code Playgroud)

然后您可以改变 N 以查看您的特定系统在何处获得最佳性能。

不要将整个文件读入内存

并发解决方案的另一个缺点是它首先将整个文件读入内存。这对 JVM 来说是不必要的并且是繁重的。

相反,一次读取文件 N 行。如前所述,一旦您在内存中将这些行从 Actor 中生成。

FileReader fr = new FileReader(file);
BufferedReader br = new BufferedReader(fr);

String[] lineBuffer;
int bufferCount = 0;
int N = 1000;

String line = br.readLine();

while(line!=null) {
    if(0 == bufferCount)
      lineBuffer = new String[N];
    else if(N == bufferCount) {
      Props props = Props.create(LogLineProcessor.class);
      ActorRef lineProcessorActor = this.getContext().actorOf(props);

      lineProcessorActor.tell(new LogLineMessage(lineBuffer),
                              this.getSelf());

      bufferCount = 0;
      continue;
    }

    lineBuffer[bufferCount] = line;
    br.readLine();
    bufferCount++;
}

//handle the final buffer
if(bufferCount > 0) {
    Props props = Props.create(LogLineProcessor.class); 
    ActorRef lineProcessorActor = this.getContext().actorOf(props);

    lineProcessorActor.tell(new LogLineMessage(lineBuffer),
                            this.getSelf());
}
Run Code Online (Sandbox Code Playgroud)

这将允许文件 IO、行处理和子图合并并行运行。