读取CSV并从Apache Beam写入BigQuery

Pra*_*nda 1 python google-bigquery google-cloud-dataflow apache-beam

我有一个GCS存储桶,我正试图从其中读取约200k个文件,然后将它们写入BigQuery。问题是我在创建与代码配合良好的PCollection时遇到了麻烦。我正在按照教程进行参考。

我有以下代码:

from __future__ import absolute_import

import argparse
import logging
import os

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage

import regex as re

# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
files = ['gs://mybucket/_chunk1',
         'gs://mybucket/_chunk0']


class DataIngestion:
    """A helper class which contains the logic to translate the file into
    a format BigQuery will accept."""

    def parse_method(self, string_input):

        x="""{}""".format(string_input)
        rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")
        d = {}
        d['name'], d['date'], d['geometry'], d['value0'], d['value1'], d['value2']=rx.split(x)
        d['geometry']=d['geometry'].strip('"')

        return d

def run(argv=None):
    """Main entry point; defines and runs the pipeline."""

    data_ingestion = DataIngestion()
    p = beam.Pipeline(options=PipelineOptions())


    (p
    | 'Create PCollection' >> beam.Create(files)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)
    | 'String To BigQuery Row' >> beam.Map(lambda s:
    data_ingestion.parse_method(s))
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.BigQuerySink(
    'mytable',
    dataset='mydataset',
    schema=myschema,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Run Code Online (Sandbox Code Playgroud)

问题是,如果fileslist仅包含一个元素,则此代码可以完美地工作。只要有1个以上的元素,“字符串转换为BigQuery行”的转换就会出错并提示error: nothing to repeat [while running 'String To BigQuery Row']。这可能与regex模块有关,但我无法弄清楚出了什么问题,因为当给定1个文件时,它可以完美地工作。

编辑:奇怪的是,它通过DirectRunner运行良好。我正在传递此处requirements.txt给出的文件。

这是我执行管道的方式:

python streaming_inserts.py --runner=DataFlowRunner --project=my-project --temp_location=gs://temp/ --staging_location=gs://stage/  --requirements_file requirements.txt --disk_size_gb 1000 --region us-east1
Run Code Online (Sandbox Code Playgroud)

我的requirements.txt样子是这样的:

regex
google-cloud-storage
Run Code Online (Sandbox Code Playgroud)

另外,根据日志,正在安装软件包: 在此处输入图片说明

use*_*062 5

OP的评论使我意识到了自己的错误:预期的库regex不是python的内置库re

使用import regex as re不仅使我感到困惑,而且还导致re库引发nothing to repeat错误。这是因为默认情况下,Dataflow不会保存您的主会话。

当执行解析函数中的代码时,它无权访问re在构建时导入的上下文。通常,这会失败,并带有NameError,但是由于您使用的是有效的库名,因此代码假定您是指内置re库并尝试按此方式执行。

如果import regex改用,则会看到NameError: name 'regex' is not defined,这是代码失败的真正原因。为了解决这个问题,可以将import语句移入解析函数本身,或者--save_main_session作为选项传递给运行器。有关更多详细信息,请参见此处


旧答案:

虽然我无法确定您使用的是哪个版本的Python,但是您对正则表达式的怀疑似乎是正确的。 *是一个特殊字符,表示之前的内容重复,但是(是一个特殊字符,表示分组,因此在(*SKIP)语法上似乎不正确。

在Python 3.7中,以上表达式甚至无法编译:

python -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 234, in compile
    return _compile(pattern, flags)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 286, in _compile
    p = sre_compile.compile(pattern, flags)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_compile.py", line 764, in compile
    p = sre_parse.parse(p, flags)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 930, in parse
    p = _parse_sub(source, pattern, flags & SRE_FLAG_VERBOSE, 0)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
    not nested and not items))
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 816, in _parse
    p = _parse_sub(source, state, sub_verbose, nested + 1)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
    not nested and not items))
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 651, in _parse
    source.tell() - here + len(this))
re.error: nothing to repeat at position 11
Run Code Online (Sandbox Code Playgroud)

Python 2.7.15也不接受它:

python2 -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python2.7/re.py", line 194, in compile
    return _compile(pattern, flags)
  File "/usr/lib/python2.7/re.py", line 251, in _compile
    raise error, v # invalid expression
sre_constants.error: nothing to repeat
Run Code Online (Sandbox Code Playgroud)

虽然我不知道您要匹配什么字符串,但我怀疑您的某些字符需要转义。例如"\{[^{}]+\}(\*SKIP)(\*FAIL)|,"

  • 刚刚注意到我在`re`和`regex`之间的困惑,并相应地更新了我的答案。我建议以后改用`import regex`以避免混淆 (2认同)