Ric*_*ZCO 2 dataflow google-cloud-platform apache-beam
我正在运行 Apache Beam 管道,从 Google Cloud Storage 读取文本文件,对这些文件执行一些解析并将解析后的数据写入 Bigquery。
为了保持简短,这里忽略解析和 google_cloud_options,我的代码如下:(apache-beam 2.5.0 with GCP add-ons and Dataflow as runner)
p = Pipeline(options=options)
lines = p | 'read from file' >>
beam.io.ReadFromText('some_gcs_bucket_path*') | \
'parse xml to dict' >> beam.ParDo(
beam.io.WriteToBigQuery(
'my_table',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
p.run()
Run Code Online (Sandbox Code Playgroud)
这运行良好并成功地将相关数据附加到我的 Bigquery 表中以获取少量输入文件。但是,当我将输入文件的数量增加到 +- 800k 时,出现错误:
“BoundedSource.split() 操作返回的 BoundedSource 对象的总大小大于允许的限制。”
我发现故障排除 apache 光束管道导入错误 [BoundedSource 对象大于允许的限制]建议使用 ReadAllFromText 而不是 ReadFromText。
但是,当我换出时,出现以下错误:
p = Pipeline(options=options)
lines = p | 'read from file' >>
beam.io.ReadFromText('some_gcs_bucket_path*') | \
'parse xml to dict' >> beam.ParDo(
beam.io.WriteToBigQuery(
'my_table',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
p.run()
Run Code Online (Sandbox Code Playgroud)
有什么建议?
我面临着同样的问题。正如理查特提beam.Create到的,必须明确调用。另一个挑战是如何将此模式与模板参数一起使用,因为beam.Create仅支持文档中描述的内存中数据。
在这种情况下,Google Cloud 支持帮助了我,我想与您分享解决方案。诀窍是使用虚拟字符串创建管道,然后使用映射 lambda 在运行时读取输入:
class AggregateOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
help='Path of the files to read from')
parser.add_value_provider_argument(
'--output',
help='Output files to write results to')
def run():
logging.info('Starting main function')
pipeline_options = PipelineOptions()
pipeline = beam.Pipeline(options=pipeline_options)
options = pipeline_options.view_as(AggregateOptions)
steps = (
pipeline
| 'Create' >> beam.Create(['Start']) # workaround to kickstart the pipeline
| 'Read Input Parameter' >> beam.Map(lambda x: options.input.get()) # get the real input param
| 'Read Data' >> beam.io.ReadAllFromText()
| # ... other steps
Run Code Online (Sandbox Code Playgroud)
希望这个回答有帮助。
| 归档时间: |
|
| 查看次数: |
3402 次 |
| 最近记录: |