如何使用 Beam 读取大型 CSV?

Kri*_*ket 9 apache-beam

我想弄清楚如何使用 Apache Beam 读取大型 CSV 文件。我所说的“大”是指几 GB(因此一次将整个 CSV 读入内存是不切实际的)。

到目前为止,我已经尝试了以下选项:

  • 使用 TextIO.read():这不好,因为引用的 CSV 字段可能包含换行符。此外,这会尝试一次将整个文件读入内存。
  • 编写一个 DoFn,将文件作为流读取并发出记录(例如使用 commons-csv)。但是,这仍然一次读取整个文件。
  • 尝试使用此处所述的 SplittableDoFn 。我的目标是让它逐渐将记录作为无界 PCollection 发出 - 基本上,将我的文件变成记录流。但是,(1) 很难正确计算 (2) 由于 ParDo 创建了多个线程,因此它需要一些复杂的同步,并且 (3) 我生成的 PCollection 仍然不是无限的。
  • 尝试创建我自己的 UnboundedSource。这似乎非常复杂且记录不足(除非我遗漏了什么?)。

Beam 是否提供了任何简单的方法来让我以我想要的方式解析文件,而不必在继续下一个转换之前将整个文件读入内存?

And*_*oud 0

从 Beam 的角度来看,TextIO 应该做正确的事情,即尽快读取文本文件并将事件发送到下一阶段。

我猜您正在为此使用 DirectRunner,这就是您看到大量内存占用的原因。希望这不是太多的解释:DirectRunner 是小型作业的测试运行器,因此它将中间步骤缓冲在内存中而不是缓冲到磁盘中。如果您仍在测试管道,则应该使用一小部分数据样本,直到您认为它有效为止。然后,您可以使用 Apache Flink 运行程序或 Google Cloud Dataflow 运行程序,它们都会在需要时将中间阶段写入磁盘。

  • DirectRunner 不仅位于本地,而且仅位于内存中。DirectRunner 缺乏在数据读取速度快于处理速度时提供背压的能力,并且也缺乏将中间数据缓冲到磁盘的能力。TextIO 一次从文件中读取一条记录,但它非常高效,并且可能同时有多个线程读取:https://github.com/apache/beam/blob/391f63d751fff2198729195e1caf155fabbbe0b7/sdks/java/core/src/main /java/org/apache/beam/sdk/io/FileBasedSource.java#L413 (3认同)