Ric*_*omi 3 python google-cloud-dataflow apache-beam
我正在研究一个从 Google Cloud Storage (GCS) 目录读取约 500 万个文件的管道。我已将其配置为在 Google Cloud Dataflow 上运行。
问题是,当我启动管道时,需要几个小时来“计算所有文件的大小”:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
Run Code Online (Sandbox Code Playgroud)
如您所见,计算大约 550 万个文件的大小花了一个半小时(5549 秒),然后又从头开始!运行第二遍又用了2个小时,然后第三遍就开始了!在撰写本文时,该作业在 Dataflow 控制台中仍然不可用,这让我相信这一切都发生在我的本地机器上,并没有利用任何分布式计算。
当我使用较小的输入数据集(2 个文件)测试管道时,它会重复大小估计 4 次:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.
Run Code Online (Sandbox Code Playgroud)
按照这个速度,仅在 Dataflow 作业开始之前对所有 5.5M 文件执行 GCS 大小估计 4 次就需要大约 8 小时。
我的管道配置了该--runner=DataflowRunner选项,因此它应该在 Dataflow 中运行:
python bigquery_import.py --runner=DataflowRunner #other options...
Run Code Online (Sandbox Code Playgroud)
管道从 GCS 读取,如下所示:
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
required=True,
help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')
Run Code Online (Sandbox Code Playgroud)
有关完整代码,请参阅GitHub 上的bigquery_import.py。
我很困惑为什么这个乏味的过程发生在 Dataflow 环境之外,为什么需要多次完成。我是从 GCS 正确读取文件还是有更有效的方法?
感谢您报告此事。Beam 有两种用于阅读文本的变换。ReadFromText和ReadAllFromText。ReadFromText会遇到这个问题,但ReadAllFromText不应该。
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438
缺点ReadAllFromText是它不会执行动态工作重新平衡,但是在读取大量文件时这应该不是问题。
创建https://issues.apache.org/jira/browse/BEAM-9620用于跟踪 ReadFromText(以及一般基于文件的源)的问题。
| 归档时间: |
|
| 查看次数: |
342 次 |
| 最近记录: |