Tho*_* W. 1 python dataflow apache-beam
我正在使用 Python SDK 2.15.0 构建数据流管道。在此管道中,我需要在管道的多个阶段将附加数据加入到每个元素。
所有这些附加数据都是从 Google Cloud Storage 上的 avro 文件中读取的(Dataflow 和 GCS 存储桶使用的同一区域),使用 map 函数将其组织为键值元组,然后使用 pvalue.AsDict( 作为侧输入传递到 DoFn )。侧面输入数据在管道执行期间不会改变。
第一次连接(侧面输入大小 ~ 1 MB)进行得非常顺利。然而,第二次连接确实表现不佳。它的 sideinput 大小约为 50 MB。
数据流执行图清楚地显示了导致性能不佳的原因:我的 ParDo 步骤消耗的大约 90% 的时间都花在了读取侧面输入上。即使我只使用四个工作节点,从 sideinput 读取的数据量也超出了其实际大小几个数量级。
我能做些什么来防止这种情况发生吗?我是否需要以某种方式配置工作缓存大小?在我的 DoFn 的设置方法中准备附加数据而不是将其作为 sideinput 传递会更好吗?
以下是我准备侧面输入的方法:
sideinput_1 = pvalue.AsDict(p | "Read side input data 1" >> beam.io.ReadFromAvro("gs:/bucket/small_file.avro",0,False,True) \
| "Prepare sideinput 1" >> beam.Map(lambda x: (x["KEY"],x["VALUE"])))
# Preparing data for later join
sideinput_2 = pvalue.AsDict(p | "Read side input data 2" >> beam.io.ReadFromAvro("gs://bucket/bigger_file.avro",0,False,True) \
| "Prepare side input data 2" >> beam.Map(lambda x: ((x["KEYCOL1"],x["KEYCOL2"],x["KEYCOL3"]),x)))
Run Code Online (Sandbox Code Playgroud)
使用侧面输入:
matching = p | "Read address data" >> beam.io.Read(beam.io.BigQuerySource(query=sql_addr, use_standard_sql=True)) \
| "Join w/ sideinput1" >> beam.ParDo(Join1(), sideinput_1 ).with_outputs('unmatched', main='matched')
result = matching["matched"] | "Join Sideinput 2" >> beam.ParDo(Join2(), sideinput_2 )
Run Code Online (Sandbox Code Playgroud)
DoFn 处理方法仅包含在侧输入中查找键,并根据是否存在匹配,向元素添加一些附加数据。
好吧,几个月后经过讨论,有了经验,让我再尝试一下:
\n\n我非常确定侧面输入的性能问题归结为内存交换问题。在管道中,还有一些其他连接非常相似,但侧输入要小得多。他们以合理的时间跑完。然而,所有这些连接的比率(IO 字节/侧输入字节)大致相等。
\n\n当我将实现从 ParDo with SideInput 切换到CoGroupByKey Transform时,受影响连接的性能提高了几个数量级。
\n\n关于侧输入的大小以及何时使用 SideInput 更喜欢 CoGroupByKey 而不是 DoFn 的更多信息:
\n\n优秀的博客文章“常见云数据流用例模式指南”指出,人们可以使用 ParDo 进行流式处理中高达 100 MB 的 SideInputs 和批处理模式下高达 1 GB 的 SideInputs:
\n\n\n\n\n注意:如果可能,请对连接表之一实际上很小 \xe2\x80\x94 在流模式下大约 100MB 或在批处理模式下小于 1GB 的任何活动使用 SideInputs。这会表现得更好[...]。
\n
我想没有适合每种情况的通用阈值。可能在很大程度上取决于您的管道、机器类型和工人数量等。就我而言,我认为由于管道的高度复杂性,阈值较低。它由约 40 个转换组成,其中包括多个连接。
\n\n因此,如果您在使用 ParDo 和 Sideinput 进行连接时遇到同样的问题,您可能需要尝试一下 CoGroupByKey-Transform。
\n