如何访问 apache_beam.io.fileio.ReadableFile() 对象?

DZv*_*vig 1 python google-cloud-platform google-cloud-dataflow apache-beam

我正在尝试使用该apache_beam.io.fileio模块来读取文件lines.txt并将其合并到我的管道中。

lines.txt有以下内容:

line1
line2
line3
Run Code Online (Sandbox Code Playgroud)

当我运行以下管道代码时:

with beam.Pipeline(options=pipeline_options) as p:

     lines = (
         p
         | beam.io.fileio.MatchFiles(file_pattern="lines.txt")
         | beam.io.fileio.ReadMatches()
     )
     # print file contents to screen
     lines | 'print to screen' >> beam.Map(print)
Run Code Online (Sandbox Code Playgroud)

我得到以下输出:

<apache_beam.io.fileio.ReadableFile object at 0x000001A8C6C55F08>
Run Code Online (Sandbox Code Playgroud)

我期望

line1
line2
line3
Run Code Online (Sandbox Code Playgroud)

我怎样才能达到我预期的结果?

DZv*_*vig 6

所得结果PCollection

p
| beam.io.fileio.MatchFiles(file_pattern="lines.txt")
| beam.io.fileio.ReadMatches()
Run Code Online (Sandbox Code Playgroud)

是一个ReadableFile对象。为了访问这个对象,我们可以使用apache beam pydoc中记录的各种函数。

下面我们实现read_utf8()

with beam.Pipeline(options=pipeline_options) as p:

    lines = (
        p
        | beam.io.fileio.MatchFiles(file_pattern="lines.txt")
        | beam.io.fileio.ReadMatches()
        | beam.Map(lambda file: file.read_utf8())
    )
    # print file contents to screen
    lines | 'print to screen' >> beam.Map(print)
Run Code Online (Sandbox Code Playgroud)

我们得到了预期的结果:

line1
line2
line3
Run Code Online (Sandbox Code Playgroud)