use*_*640 6 python csv google-bigquery google-cloud-dataflow apache-beam
我想阅读一个csv文件,并使用apache beam dataflow将其写入BigQuery.为了做到这一点,我需要以字典的形式向BigQuery提供数据.如何使用apache beam转换数据才能执行此操作?
我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列表.我知道如何在BigQuery中创建数据,这是直接的,我不知道的是如何将csv转换为字典.下面的代码不正确,但应该知道我正在尝试做什么.
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
Run Code Online (Sandbox Code Playgroud)
Pab*_*blo 22
我们的想法是拥有一个返回解析后的CSV行的源代码.您可以通过继承fileio类来包含CSV解析来完成此操作.特别是,FileBasedSource函数看起来像这样:
def get_csv_reader(readable_file):
# You can return whichever kind of reader you want here
# a DictReader, or a normal csv.reader.
if sys.version_info >= (3, 0):
return csv.reader(io.TextIOWrapper(readable_file.open()))
else:
return csv.reader(readable_file.open())
with Pipeline(...) as p:
content_pc = (p
| beam.io.fileio.MatchFiles("/my/file/name")
| beam.io.fileio.ReadMatches()
| beam.Reshuffle() # Useful if you expect many matches
| beam.FlatMap(get_csv_reader))
Run Code Online (Sandbox Code Playgroud)
我最近read_records为Apache Beam 写了一篇文章.您可以查看Github存储库.您可以使用fileio和FileBasedSource使用它.read_records还包括用于设置自定义分隔符,跳过文件头和/或输出词典而不是列表的选项.
作为对巴勃罗帖子的补充,我想分享一下我自己对他的样本所做的一些改变。(为你+1!)
变成
reader = csv.reader(self._file)reader = csv.DictReader(self._file)
使用csv.DictReaderCSV 文件的第一行作为 Dict 键。其他行用于用其值填充每行的字典。它会根据列顺序自动将正确的值放入正确的键中。
一个小细节是 Dict 中的每个值都存储为字符串。如果您使用例如,这可能会与您的 BigQuery 架构发生冲突。某些字段为 INTEGER。因此,您需要在之后进行适当的铸造。