为什么我需要调整我的 PCollection 以使其在 Cloud Dataflow 上自动缩放?

bst*_*str 5 python google-cloud-dataflow apache-beam

语境

我正在使用如下所示的过程从 Beam 中的 Google Storage 读取文件:

data = pipeline | beam.Create(['gs://my/file.pkl']) | beam.ParDo(LoadFileDoFn)
Run Code Online (Sandbox Code Playgroud)

WhereLoadFileDoFn加载文件并从中创建对象的 Python 列表,ParDo然后以PCollection.

我知道我可能会实现一个自定义源来实现类似的功能,但是这个答案和 Beam自己的文档表明这种使用伪数据集读取的ParDo方法并不少见,并且自定义源可能有点矫枉过正。

它也有效 - 我得到了PCollection正确数量的元素,我可以随心所欲地处理它!然而..

自动缩放问题

结果PCollection在 Cloud Dataflow 上根本不会自动缩放。我首先必须通过以下方式对其进行转换:

shuffled_data = data | beam.Shuffle()
Run Code Online (Sandbox Code Playgroud)

我知道我也在上面链接的这个答案几乎解释了这个过程 - 但它没有提供任何关于为什么这是必要的见解。就我在 Beam 非常高的抽象级别所看到的而言,我PCollection在洗牌之前有 N 个元素,在洗牌PCollection之后有一个类似的元素。为什么一个可以缩放,而另一个不缩放?

在这种情况下,文档不是很有帮助(或者一般来说,但这是另一回事)。第一个PCollection有什么隐藏属性可以防止它被分发给另一个没有的多个工作人员?

Eri*_*idt 7

当您通过 Create 阅读时,您正在创建一个绑定到 1 个工人的 PCollection。由于没有与项目关联的密钥,因此没有分发工作的机制。Shuffle() 将在封面下创建一个 K,V,然后随机播放,这使得 PCollection 项目能够在新工人旋转时分发给他们。您可以通过关闭自动缩放并将工作人员大小固定为 25 来验证此行为 - 如果没有 Shuffle,您将只会看到 1 个工作人员在工作。

在创建/读取时分发此工作的另一种方法是构建您自己的自定义 I/O 以读取 PKL 文件1。您将创建适当的拆分器;但是,不知道您腌制了什么,它可能无法拆分。IMO Shuffle() 是一个安全的赌注,模数你可以通过编写一个可拆分的阅读器来获得优化。