如何向 Apache Beam 中的复合转换提供参数?

lpo*_*nng 6 python apache-beam

我正在使用 Apache Beam 的 Python SDK。

我有一些转换步骤,并希望使它们可重用,这表明我要编写一个自定义复合转换,如下所示:

class MyCompositeTransform(beam.PTransform):
def expand(self, pcoll, arg1, kwarg1=u'default'):
    result = (pcoll
              | 'Step 1' >> beam.Map(lambda f: SomeFn(f, arg1))
              | 'Last step' >> beam.Map(lambda f: SomeOtherFn(f, kwarg1))
              )
    return result
Run Code Online (Sandbox Code Playgroud)

我想要的是提供一些额外的参数arg1kwarg1这些参数是内部其他转换所需要的。但我不知道这是否是一种有效的方法,也不知道如何在管道中使用它。

有人可以给我指出方向吗?

ruh*_*ong 6

您可以通过构造函数提供参数PTransform。参数还可以采用侧面输入的形式(即另一个转换的数据输出)。这是一个同时使用“正常”参数和侧面输入的示例。

from typing import Dict, Any, Iterable
import apache_beam as beam


class MyCompositeTransform(beam.PTransform):

    def __init__(self, my_arg, my_side_input):
        super().__init__()
        self.my_arg= my_arg
        self.my_side_input= my_side_input

    @staticmethod
    def transform(
        element: Dict[str, Any], my_arg: int, my_side_input: Iterable[int]
    ) -> Dict[str, Any]:
        pass

    def expand(self, pcoll):
        return pcoll | "MyCompositeTransform" >> beam.Map(
            MyCompositeTransform.transform,
            self.my_arg,
            beam.pvalue.AsIter(self.my_side_input),
        )
Run Code Online (Sandbox Code Playgroud)

用于beam.pvalue定义如何将侧面输入传递给变换,例如它是单个值、anIterable还是具体化为a List

来自 Beam 的其他示例:(请参阅PTransformhttps://beam.apache.org/releases/pydoc/2.20.0/_modules/apache_beam/transforms/stats.html

  • 很好的答案,应该被接受:它回答了问题,开门见山,提供了一个清晰的示例,并且还提供了关于如何使用 beam.pvalue 正确使用数据的有用侧面输入(双关语)自由度。 (3认同)

Ant*_*ton 0

一般来说,您不能像您所描述的那样在运行时动态地将附加参数传递给转换。当您运行构建管道的控制器程序时,管道的结构被序列化、发送,然后在一组无权访问您的控制器程序的工作人员上并行执行,他们只能获取结构和实际的结构。你的代码ParDos

动态参数化执行的一种方法是提供额外的数据作为额外的输入,例如创建另一个PCollection填充参数值的数据,然后将其与 main 结合起来PCollection。例如使用side-inputs,或CoGroupByKey

如果您正在查看Cloud Dataflow,那么您可能会考虑使用管道模板ValueProviders,但不确定它们是否在 pyton 或非 Dataflow 运行程序中可用。