从Pipeline中的PCollection GCS文件名中读取文件?

Isa*_*row 7 google-cloud-dataflow

我有一个连接到pub/sub的流管道,它发布了GCS文件的文件名.从那里我想读取每个文件并解析每一行上的事件(事件是我最终想要处理的事件).

我可以使用TextIO吗?当在执行期间定义文件名时,您是否可以在流管道中使用它(而不是使用TextIO作为源,并且fileName(s)在构造时已知).如果不是,我正在考虑做以下事情:

从pub/sub ParDo获取主题以读取每个文件并获取行处理文件的行...

我可以使用FileBasedReader或类似的东西来读取文件吗?文件不是太大,所以我不需要并行读取单个文件,但我需要读取大量文件.

jkf*_*kff 5

您可以使用TextIO.readAll()最近在#3443 中添加到 Beam的变换。例如:

PCollection<String> filenames = p.apply(PubsubIO.readStrings()...);
PCollection<String> lines = filenames.apply(TextIO.readAll());
Run Code Online (Sandbox Code Playgroud)

这将读取通过 pubsub 到达的每个文件中的所有行。