有一些人和我一起从事一个项目,他们一直在努力寻找解决此问题的最佳方法。看来这应该是经常需要的标准事物,但是由于某些原因,我们似乎无法获得正确的答案。
如果我有一些工作要做,并且在路由器上抛出一堆消息,我怎么知道什么时候完成了?例如,如果我们正在读取一百万行文件的行并将该行发送给演员以进行处理,而您需要处理下一个文件,但是必须等待第一个文件完成,那么如何知道它何时已经完成?
另一条评论。我知道并已将Await.result()和Await.ready()与Patters.ask()一起使用。一个区别是,每条线都有一个期货,而我们将有大量等待这些期货的数组,而不仅仅是一个。此外,我们正在填充占用大量内存的大型域模型,并且不希望添加额外的内存来在等待组成的内存中保存相等数量的期货,而使用actor时每个参与者在完成工作后都会完成而不保存内存组成。
我们使用的是Java,而不是Scala。
伪代码:
for(File file : files) {
...
while((String line = getNextLine(fileStream)) != null) {
router.tell(line, this.getSelf());
}
// we need to wait for this work to finish to do the next
// file because it's dependent on the previous work
}
Run Code Online (Sandbox Code Playgroud)
似乎您经常需要做很多工作,并知道何时与演员结盟。
我相信我有一个适合你的解决方案,它不涉及积累一大堆Futures。一是理念层次高。将有两名演员参与此流程。第一个我们会打电话FilesProcessor。这位演员的寿命很短,但很有威严。每当您想要按顺序处理一堆文件时,您都可以启动该参与者的一个实例,并向其传递一条包含您想要处理的文件的名称(或路径)的消息。当它完成所有文件的处理后,它会自行停止。我们要呼叫的第二个演员LineProcessor。这个参与者是无状态的、长期存在的并且集中在路由器后面。它处理文件行,然后响应请求该行处理的任何人,告诉他们它已完成该行的处理。现在来看代码。
首先是消息:
public class Messages {
public static class ProcessFiles{
public final List<String> fileNames;
public ProcessFiles(List<String> fileNames){
this.fileNames = fileNames;
}
}
public static class ProcessLine{
public final String line;
public ProcessLine(String line){
this.line = line;
}
}
public static class LineProcessed{}
public static LineProcessed LINE_PROCESSED = new LineProcessed();
}
Run Code Online (Sandbox Code Playgroud)
还有FilesProcessor:
public class FilesProcessor extends UntypedActor{
private List<String> files;
private int awaitingCount;
private ActorRef router;
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof ProcessFiles){
ProcessFiles pf = (ProcessFiles)msg;
router = ... //lookup router;
files = pf.fileNames;
processNextFile();
}
else if (msg instanceof LineProcessed){
awaitingCount--;
if (awaitingCount <= 0){
processNextFile();
}
}
}
private void processNextFile(){
if (files.isEmpty()) getContext().stop(getSelf());
else{
String file = files.remove(0);
BufferedReader in = openFile(file);
String input = null;
awaitingCount = 0;
try{
while((input = in.readLine()) != null){
router.tell(new Messages.ProcessLine(input), getSelf());
awaitingCount++;
}
}
catch(IOException e){
e.printStackTrace();
getContext().stop(getSelf());
}
}
}
private BufferedReader openFile(String name){
//do whetever to load file
...
}
}
Run Code Online (Sandbox Code Playgroud)
还有LineProcessor:
public class LineProcessor extends UntypedActor{
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof ProcessLine){
ProcessLine pl = (ProcessLine)msg;
//Do whatever line processing...
getSender().tell(Messages.LINE_PROCESSED, getSelf());
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在,线路处理器正在发送回没有附加内容的响应。如果您需要根据线路的处理发回某些内容,您当然可以更改此设置。我确信这段代码不是防弹的,我只是想向您展示一个高级概念,说明如何在没有请求/响应语义和Futures 的情况下完成此流程。
如果您对此方法有任何疑问或想要了解更多详细信息,请告诉我,我很乐意提供。
| 归档时间: |
|
| 查看次数: |
4069 次 |
| 最近记录: |