And*_*son 5 google-cloud-dataflow apache-beam
在Beam Python DoFn中进行昂贵的一次性初始化的推荐方法是什么?Java SDK具有DoFn.Setup,但在Beam Python中似乎没有等效的东西。
当前是threading.local()在DoFn初始化程序中将对象附加到的最佳方法吗?
sci*_*bor 10
数据流Python对于初始化昂贵对象的最佳方法并不是特别透明。有几种机制可以不频繁地实例化对象(目前不理想的是仅执行一次初始化)。以下概述了我进行的一些实验和得出的结论。希望Beam社区中的任何人都可以在我流浪的地方纠正我。
__init__
尽管该__init__方法可用于一次初始化一个昂贵的对象,但是这种初始化不会在Worker机器上发生。该对象将需要序列化才能发送给Worker,对于大型对象以及Tensorflow模型,Worker可能非常笨拙或根本无法工作。此外,由于此对象将被串行化并通过电线发送,因此在此处执行初始化并不安全,因为可以拦截有效载荷。建议不要使用此方法。
start_bundle()
数据流以离散的组处理数据,该组称为捆绑。这些在批处理过程中已经很好地定义,但是在流传输中,它们取决于吞吐量。没有用于配置Dataflow如何创建其捆绑包的机制,实际上捆绑包的大小完全由Dataflow决定。该start_bundle()方法将在Worker上调用,并可用于初始化状态,但是实验发现,在流上下文中,此方法的调用频率比期望的要高,并且昂贵的重新初始化将经常发生。
延迟初始化
Beam的方法提出了这种方法,这出人意料地是性能最高的。延迟初始化意味着您创建一些要初始化为的有状态参数None,然后执行如下代码:
if self.expensive_object is None:
self.expensive_object = self.__expensive_initialization()
Run Code Online (Sandbox Code Playgroud)
您可以直接在您的process()方法中执行此代码。您还可以轻松地将一些依赖于global状态的辅助函数放在一起,以便您可以使用诸如此类的功能(该示例的示例在本文的底部):
self.expensive_object = get_or_initialize_global(‘expensive_object’, self.__expensive_initialization)
Run Code Online (Sandbox Code Playgroud)
实验
在使用start_bundle上述延迟初始化方法和上述两种方法配置的作业上运行以下实验,并通过适当的日志记录来指示调用。将各种吞吐量发布到适当的队列中,并相应地记录结果。
在1毫秒/秒的速率下持续100秒:
Context Number of Invocations
------------------------------------------------------------
NEW BUNDLE 100
LAZY INITIALIZATION 25
TOTAL MESSAGES 100
Run Code Online (Sandbox Code Playgroud)
每秒10毫秒/秒的速度
Context Number of Invocations
------------------------------------------------------------
NEW BUNDLE 942
LAZY INITIALIZATION 3
TOTAL MESSAGES 1000
Run Code Online (Sandbox Code Playgroud)
在100毫秒内以100毫秒/秒的速率
Context Number of Invocations
------------------------------------------------------------
NEW BUNDLE 2447
LAZY INITIALIZATION 30
TOTAL MESSAGES 10000
Run Code Online (Sandbox Code Playgroud)
在100毫秒内以1000毫秒/秒的速率
Context Number of Invocations
------------------------------------------------------------
NEW BUNDLE 2293
LAZY INITIALIZATION 36
TOTAL MESSAGES 100000
Run Code Online (Sandbox Code Playgroud)
外卖
尽管start_bundle对于高吞吐量而言效果很好,但是无论吞吐量如何,惰性初始化在很大程度上都是最有效的。建议在Python Beam上执行昂贵的初始化的方法。考虑到官方文档中的这句话,这个结果也许并不令人惊讶:
设置-在每个DoFn实例之前调用一次;这在Python SDK中尚未实现,因此用户可以通过延迟初始化来解决
不过,所谓的“变通”这一事实并不特别令人鼓舞,也许我们可以在不久的将来看到更强大的功能。
代码样例
由Andreas Jansson提供:
def get_or_initialize_global(object_key, initialize_expensive_object):
if object_key in globals():
expensive_object = globals()[object_key]
else:
expensive_object = initialize_expensive_object()
globals()[object_key] = expensive_object
Run Code Online (Sandbox Code Playgroud)