使用Apache Beam以CSV格式将BigQuery结果写入GCS

Har*_*ari 2 python google-bigquery google-cloud-dataflow apache-beam

我是Apache Beam的新手,我在其中尝试编写管道以从Google BigQuery提取数据,然后使用Python将数据以CSV格式写入GCS。

使用,beam.io.read(beam.io.BigQuerySource())我能够从BigQuery读取数据,但不确定如何将其以CSV格式写入GCS。

是否有实现相同功能的自定义功能,能否请您帮我吗?

import logging

import apache_beam as beam


PROJECT='project_id'
BUCKET='project_bucket'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=readwritebq',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
]

with beam.Pipeline(argv=argv) as p:

    # Execute the SQL in big query and store the result data set into given Destination big query table.
    BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
        beam.io.BigQuerySource(query =  'Select * from `dataset.table`', use_standard_sql=True))
    # Extract data from Bigquery to GCS in CSV format.
    # This is where I need your help

    BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
            table='tablename',
            dataset='datasetname',
            project='project_id',
            schema='name:string,gender:string,count:integer',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)

if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   run()
Run Code Online (Sandbox Code Playgroud)

Gui*_*ins 5

您可以通过WriteToText添加.csv后缀和来实现headers。考虑到您需要将查询结果解析为CSV格式。作为示例,我使用了莎士比亚公共数据集和以下查询:

从`bigquery-public-data.samples.shakespeare`中选择单词,单词数,语料库,其中CHAR_LENGTH(word)> 3按单词数计数排序限制10

现在,我们通过以下方式读取查询结果:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))
Run Code Online (Sandbox Code Playgroud)

BQ_DATA 现在包含键值对:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}
Run Code Online (Sandbox Code Playgroud)

我们可以应用一个beam.Map函数只产生值:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}
Run Code Online (Sandbox Code Playgroud)

摘录BQ_VALUES

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]
Run Code Online (Sandbox Code Playgroud)

最后再次映射,使所有列值用逗号而不是列表分开(考虑到如果双引号可以出现在字段中,则需要转义双引号):

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())
Run Code Online (Sandbox Code Playgroud)

现在,我们将结果后缀和标头写入GCS:

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]
Run Code Online (Sandbox Code Playgroud)

书面结果:

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))
Run Code Online (Sandbox Code Playgroud)