leb*_*olo 3 asynchronous inputstream vert.x
我正在尝试使用 Vert.x 从文件系统读取一个大文件并逐行处理它。从核心文档来看,我认为做到这一点的方法是通过 anAsyncFile和 a RecordParser。理想情况下,我想要Pump数据(以避免背压),但RecordParser不是WriteStream:
AsyncFile asyncFile = vertx.fileSystem().openBlocking(/*path and options*/);
RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
// Do something per line
});
Pump.pump(asyncFile, recordParser).start(); // Error - RecordParser cannot be converted to WriteStream
Run Code Online (Sandbox Code Playgroud)
所以我想我必须自己抽奶?我尝试过类似的东西:
RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
// Do something per line
// I can see this code get run
})
.exceptionHandler(cause -> {
// Do I need this? What are the repercussions if I don't have this handler? Are exceptions just lost?
})
.endHandler(_void -> {
// This never gets called!
});
asyncFile.handler(recordParser); // Crazy Java8 syntax passes recordParser.handle to this handler :)
Run Code Online (Sandbox Code Playgroud)
但是,我不确定我的文件是否已关闭,因为recordParser.endHandler永远不会被调用(尽管我可以看到正在调用行处理程序)。
我究竟做错了什么?文件没有被关闭吗?我尝试添加endHandler并asyncFile关闭它,但这不起作用。
理想情况下,我宁愿去Pump工作。这种场景下有什么办法可以使用吗Pump?
提前致谢!
PS:请原谅“十字路口”。
你可以按照AsyncFile你RecordParser所说的做。
RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
System.out.println("bufferedLine = " + bufferedLine);
});
asyncFile.handler(recordParser)
.endHandler(v -> {
asyncFile.close();
System.out.println("Done");
});
Run Code Online (Sandbox Code Playgroud)
如果您这样做,您应该在 上设置异常处理程序AsyncFile,而不是在RecordParser.
使用上面的代码,文件将被正确关闭。
但是,我不确定我的文件是否已关闭,因为 recordParser.endHandler 永远不会被调用(尽管我可以看到正在调用行处理程序)。
实际上,只有当endHandlerthe是通过包装另一个来创建exceptionHandler时才会被调用RecordParserReadStream。就您而言,它们不是必需的。
另外,关于您的评论:
// 疯狂的 Java8 语法将 recordParser.handle 传递给这个处理程序:)
这不是 Java 8 的技巧,它只是AsyncFile.handler()需要一个Handler<Buffer>并RecordParser实现这个接口。