有没有办法使用ReadFromText转换(Python)读取Apache Beam中的多行csv文件?

Bra*_*don 5 python google-cloud-platform google-cloud-dataflow apache-beam apache-beam-io

有没有办法ReadFromText在Python中使用转换读取多行csv文件?我有一个文件,其中包含一行,我试图让Apache Beam将输入读作一行,但无法让它工作.

def print_each_line(line):
    print line

path = './input/testfile.csv'
# Here are the contents of testfile.csv
# foo,bar,"blah blah
# more blah blah",baz

p = apache_beam.Pipeline()

(p
 | 'ReadFromFile' >> apache_beam.io.ReadFromText(path)
 | 'PrintEachLine' >> apache_beam.FlatMap(lambda line: print_each_line(line))
 )

# Here is the output:
# foo,bar,"blah blah
# more blah blah",baz
Run Code Online (Sandbox Code Playgroud)

上面的代码将输入解析为两行,即使多行csv文件的标准是将多行元素包装在双引号内.

小智 2

Beam 不支持解析 CSV 文件。不过,您可以使用 Python 的 csv.reader。这是一个例子:

import apache_beam
import csv

def print_each_line(line):
  print line

p = apache_beam.Pipeline()

(p 
 | apache_beam.Create(["test.csv"])
 | apache_beam.FlatMap(lambda filename:
     csv.reader(apache_beam.io.filesystems.FileSystems.open(filename)))
 | apache_beam.FlatMap(print_each_line))

p.run()
Run Code Online (Sandbox Code Playgroud)

输出:

['foo', 'bar', 'blah blah\nmore blah blah', 'baz']
Run Code Online (Sandbox Code Playgroud)