使用 Vert.x 逐行读取文件(AsyncFile 和 RecordParser 帮助)

leb*_*olo 3 asynchronous inputstream vert.x

我正在尝试使用 Vert.x 从文件系统读取一个大文件并逐行处理它。从核心文档来看,我认为做到这一点的方法是通过 anAsyncFile和 a RecordParser。理想情况下,我想要Pump数据(以避免背压),但RecordParser不是WriteStream

AsyncFile asyncFile = vertx.fileSystem().openBlocking(/*path and options*/);

RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
  // Do something per line
});

Pump.pump(asyncFile, recordParser).start(); // Error - RecordParser cannot be converted to WriteStream
Run Code Online (Sandbox Code Playgroud)

所以我想我必须自己抽奶?我尝试过类似的东西:

RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
  // Do something per line
  // I can see this code get run
})
.exceptionHandler(cause -> {
  // Do I need this? What are the repercussions if I don't have this handler? Are exceptions just lost?
})
.endHandler(_void -> {
  // This never gets called!
});

asyncFile.handler(recordParser); // Crazy Java8 syntax passes recordParser.handle to this handler :)
Run Code Online (Sandbox Code Playgroud)

但是,我不确定我的文件是否已关闭,因为recordParser.endHandler永远不会被调用(尽管我可以看到正在调用行处理程序)。

我究竟做错了什么?文件没有被关闭吗?我尝试添加endHandlerasyncFile关闭它,但这不起作用。

理想情况下,我宁愿去Pump工作。这种场景下有什么办法可以使用吗Pump

提前致谢!

PS:请原谅“十字路口”

tse*_*ont 5

你可以按照AsyncFileRecordParser所说的做。

RecordParser recordParser = RecordParser.newDelimited("\n", bufferedLine -> {
  System.out.println("bufferedLine = " + bufferedLine);
});

asyncFile.handler(recordParser)
    .endHandler(v -> {
      asyncFile.close();
      System.out.println("Done");
    });
Run Code Online (Sandbox Code Playgroud)

如果您这样做,您应该在 上设置异常处理程序AsyncFile,而不是在RecordParser.

使用上面的代码,文件将被正确关闭。

但是,我不确定我的文件是否已关闭,因为 recordParser.endHandler 永远不会被调用(尽管我可以看到正在调用行处理程序)。

实际上,只有当endHandlerthe是通过包装另一个来创建exceptionHandler时才会被调用RecordParserReadStream。就您而言,它们不是必需的。

另外,关于您的评论:

// 疯狂的 Java8 语法将 recordParser.handle 传递给这个处理程序:)

这不是 Java 8 的技巧,它只是AsyncFile.handler()需要一个Handler<Buffer>RecordParser实现这个接口。

  • 如果要将缓冲区传输到“WriteStream”(如另一个文件或 HTTP 服务器响应),则需要“Pump”。在这种情况下,您可以通过[包装](https://vertx.io/docs/apidocs/io/vertx/core/parsetools/RecordParser.html#newDelimited-java.lang.String-io)创建“RecordParser”。 vertx.core.streams.ReadStream-) `AsyncFile`。然后,“RecordParser”可以用作具有“Pump”的任何其他“ReadStream”。 (2认同)