Apache Beam:Python SDK中的等效DoFn.Setup

And*_*son 5 google-cloud-dataflow apache-beam

在Beam Python DoFn中进行昂贵的一次性初始化的推荐方法是什么?Java SDK具有DoFn.Setup,但在Beam Python中似乎没有等效的东西。

当前threading.local()在DoFn初始化程序中将对象附加到的最佳方法吗?

sci*_*bor 10

数据流Python对于初始化昂贵对象的最佳方法并不是特别透明。有几种机制可以不频繁地实例化对象(目前不理想的是仅执行一次初始化)。以下概述了我进行的一些实验和得出的结论。希望Beam社区中的任何人都可以在我流浪的地方纠正我。

__init__

尽管该__init__方法可用于一次初始化一个昂贵的对象,但是这种初始化不会在Worker机器上发生。该对象将需要序列化才能发送给Worker,对于大型对象以及Tensorflow模型,Worker可能非常笨拙或根本无法工作。此外,由于此对象将被串行化并通过电线发送,因此在此处执行初始化并不安全,因为可以拦截有效载荷。建议不要使用此方法。

start_bundle()

数据流以离散的组处理数据,该组称为捆绑。这些在批处理过程中已经很好地定义,但是在流传输中,它们取决于吞吐量。没有用于配置Dataflow如何创建其捆绑包的机制,实际上捆绑包的大小完全由Dataflow决定。该start_bundle()方法将在Worker上调用,并可用于初始化状态,但是实验发现,在流上下文中,此方法的调用频率比期望的要高,并且昂贵的重新初始化将经常发生。

延迟初始化

Beam的方法提出了这种方法,这出人意料地是性能最高的。延迟初始化意味着您创建一些要初始化为的有状态参数None,然后执行如下代码:

if self.expensive_object is None:
    self.expensive_object = self.__expensive_initialization()
Run Code Online (Sandbox Code Playgroud)

您可以直接在您的process()方法中执行此代码。您还可以轻松地将一些依赖于global状态的辅助函数放在一起,以便您可以使用诸如此类的功能(该示例的示例在本文的底部):

self.expensive_object = get_or_initialize_global(‘expensive_object’, self.__expensive_initialization)
Run Code Online (Sandbox Code Playgroud)

实验

在使用start_bundle上述延迟初始化方法和上述两种方法配置的作业上运行以下实验,并通过适当的日志记录来指示调用。将各种吞吐量发布到适当的队列中,并相应地记录结果。

在1毫秒/秒的速率下持续100秒:

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                             100 
LAZY INITIALIZATION                                     25 
TOTAL MESSAGES                                         100 
Run Code Online (Sandbox Code Playgroud)

每秒10毫秒/秒的速度

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                             942 
LAZY INITIALIZATION                                      3 
TOTAL MESSAGES                                        1000 
Run Code Online (Sandbox Code Playgroud)

在100毫秒内以100毫秒/秒的速率

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                            2447 
LAZY INITIALIZATION                                     30 
TOTAL MESSAGES                                       10000 
Run Code Online (Sandbox Code Playgroud)

在100毫秒内以1000毫秒/秒的速率

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                            2293 
LAZY INITIALIZATION                                     36 
TOTAL MESSAGES                                      100000 
Run Code Online (Sandbox Code Playgroud)

外卖

尽管start_bundle对于高吞吐量而言效果很好,但是无论吞吐量如何,惰性初始化在很大程度上都是最有效的。建议在Python Beam上执行昂贵的初始化的方法。考虑到官方文档中的这句话,这个结果也许并不令人惊讶:

设置-在每个DoFn实例之前调用一次;这在Python SDK中尚未实现,因此用户可以通过延迟初始化来解决

不过,所谓的“变通”这一事实并不特别令人鼓舞,也许我们可以在不久的将来看到更强大的功能。

代码样例

由Andreas Jansson提供:

def get_or_initialize_global(object_key, initialize_expensive_object):
    if object_key in globals():
        expensive_object = globals()[object_key]
    else:
        expensive_object = initialize_expensive_object()
        globals()[object_key] = expensive_object
Run Code Online (Sandbox Code Playgroud)


rob*_*twb 6

安装和拆卸现已添加到 Python SDK 中,并且是在 Beam Python DoFn 中进行昂贵的一次性初始化的推荐方法。