避免在 Beam Python SDK 中重新计算所有 Cloud Storage 文件的大小

Ric*_*omi 3 python google-cloud-dataflow apache-beam

我正在研究一个从 Google Cloud Storage (GCS) 目录读取约 500 万个文件的管道。我已将其配置为在 Google Cloud Dataflow 上运行。

问题是,当我启动管道时,需要几个小时来“计算所有文件的大小”:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
Run Code Online (Sandbox Code Playgroud)

如您所见,计算大约 550 万个文件的大小花了一个半小时(5549 秒),然后又从头开始!运行第二遍又用了2个小时,然后第三遍就开始了!在撰写本文时,该作业在 Dataflow 控制台中仍然不可用,这让我相信这一切都发生在我的本地机器上,并没有利用任何分布式计算。

当我使用较小的输入数据集(2 个文件)测试管道时,它会重复大小估计 4 次:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.
Run Code Online (Sandbox Code Playgroud)

按照这个速度,仅在 Dataflow 作业开始之前对所有 5.5M 文件执行 GCS 大小估计 4 次就需要大约 8 小时。

我的管道配置了该--runner=DataflowRunner选项,因此它应该在 Dataflow 中运行:

python bigquery_import.py --runner=DataflowRunner #other options...
Run Code Online (Sandbox Code Playgroud)

管道从 GCS 读取,如下所示:

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input',
    required=True,
    help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

with beam.Pipeline(options=pipeline_options) as p:
    files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')
Run Code Online (Sandbox Code Playgroud)

有关完整代码,请参阅GitHub 上的bigquery_import.py

我很困惑为什么这个乏味的过程发生在 Dataflow 环境之外,为什么需要多次完成。我是从 GCS 正确读取文件还是有更有效的方法?

cha*_*ara 5

感谢您报告此事。Beam 有两种用于阅读文本的变换。ReadFromTextReadAllFromTextReadFromText会遇到这个问题,但ReadAllFromText不应该。

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438

缺点ReadAllFromText是它不会执行动态工作重新平衡,但是在读取大量文件时这应该不是问题。

创建https://issues.apache.org/jira/browse/BEAM-9620用于跟踪 ReadFromText(以及一般基于文件的源)的问题。

  • 通过改变`p |让它工作 beam.io.ReadFromText(gcs_path)` 到 `p | 创建([gcs_path])| beam.io.ReadAllFromText()`。两个值得注意的变化是: 将包含 glob 字符串的列表传递给 ReadAllFromText 方法。非常感谢你的帮助! (2认同)