如何在apache beam dataflow中将csv转换为字典

use*_*640 6 python csv google-bigquery google-cloud-dataflow apache-beam

我想阅读一个csv文件,并使用apache beam dataflow将其写入BigQuery.为了做到这一点,我需要以字典的形式向BigQuery提供数据.如何使用apache beam转换数据才能执行此操作?

我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列表.我知道如何在BigQuery中创建数据,这是直接的,我不知道的是如何将csv转换为字典.下面的代码不正确,但应该知道我正在尝试做什么.

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
Run Code Online (Sandbox Code Playgroud)

Pab*_*blo 22

我们的想法是拥有一个返回解析后的CSV行的源代码.您可以通过继承fileio类来包含CSV解析来完成此操作.特别是,FileBasedSource函数看起来像这样:

def get_csv_reader(readable_file):
  # You can return whichever kind of reader you want here
  # a DictReader, or a normal csv.reader.
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with Pipeline(...) as p:
  content_pc = (p
                | beam.io.fileio.MatchFiles("/my/file/name")
                | beam.io.fileio.ReadMatches()
                | beam.Reshuffle()  # Useful if you expect many matches
                | beam.FlatMap(get_csv_reader))
Run Code Online (Sandbox Code Playgroud)

我最近read_records为Apache Beam 写了一篇文章.您可以查看Github存储库.您可以使用fileioFileBasedSource使用它.read_records还包括用于设置自定义分隔符,跳过文件头和/或输出词典而不是列表的选项.

  • 非常感谢 Pablo,这真的很好用!这是一个代码片段,以防人们正在寻找完整性 (p | 'read solar data' >> beam.Read(CsvFileSource('./sensor1_121116.csv')) | 'save' >> beam.Write(beam.io .TextFileSink('./greetings_solar'))) (2认同)
  • 也许有点晚了,但是玩了一段时间之后,我发现_completeness_代码应该是这个`(p |'read solar data'>> beam.io.Read(CsvFileSource('./ sensor1_121116.csv') )|'保存'>> beam.io.Write(beam.io.TextFileSink('./ greetings_solar')))`注意在应用程序调用中_beam_引用后插入的** io ** (2认同)

Mar*_*Dam 6

作为对巴勃罗帖子的补充,我想分享一下我自己对他的样本所做的一些改变。(为你+1!)

变成 reader = csv.reader(self._file)reader = csv.DictReader(self._file)

使用csv.DictReaderCSV 文件的第一行作为 Dict 键。其他行用于用其值填充每行的字典。它会根据列顺序自动将正确的值放入正确的键中。

一个小细节是 Dict 中的每个值都存储为字符串。如果您使用例如,这可能会与您的 BigQuery 架构发生冲突。某些字段为 INTEGER。因此,您需要在之后进行适当的铸造。