lpo*_*nng 6 python apache-beam
我正在使用 Apache Beam 的 Python SDK。
我有一些转换步骤,并希望使它们可重用,这表明我要编写一个自定义复合转换,如下所示:
class MyCompositeTransform(beam.PTransform):
def expand(self, pcoll, arg1, kwarg1=u'default'):
result = (pcoll
| 'Step 1' >> beam.Map(lambda f: SomeFn(f, arg1))
| 'Last step' >> beam.Map(lambda f: SomeOtherFn(f, kwarg1))
)
return result
Run Code Online (Sandbox Code Playgroud)
我想要的是提供一些额外的参数arg1,kwarg1这些参数是内部其他转换所需要的。但我不知道这是否是一种有效的方法,也不知道如何在管道中使用它。
有人可以给我指出方向吗?
您可以通过构造函数提供参数PTransform。参数还可以采用侧面输入的形式(即另一个转换的数据输出)。这是一个同时使用“正常”参数和侧面输入的示例。
from typing import Dict, Any, Iterable
import apache_beam as beam
class MyCompositeTransform(beam.PTransform):
def __init__(self, my_arg, my_side_input):
super().__init__()
self.my_arg= my_arg
self.my_side_input= my_side_input
@staticmethod
def transform(
element: Dict[str, Any], my_arg: int, my_side_input: Iterable[int]
) -> Dict[str, Any]:
pass
def expand(self, pcoll):
return pcoll | "MyCompositeTransform" >> beam.Map(
MyCompositeTransform.transform,
self.my_arg,
beam.pvalue.AsIter(self.my_side_input),
)
Run Code Online (Sandbox Code Playgroud)
用于beam.pvalue定义如何将侧面输入传递给变换,例如它是单个值、anIterable还是具体化为a List?
来自 Beam 的其他示例:(请参阅PTransform)https://beam.apache.org/releases/pydoc/2.20.0/_modules/apache_beam/transforms/stats.html
一般来说,您不能像您所描述的那样在运行时动态地将附加参数传递给转换。当您运行构建管道的控制器程序时,管道的结构被序列化、发送,然后在一组无权访问您的控制器程序的工作人员上并行执行,他们只能获取结构和实际的结构。你的代码ParDos。
动态参数化执行的一种方法是提供额外的数据作为额外的输入,例如创建另一个PCollection填充参数值的数据,然后将其与 main 结合起来PCollection。例如使用side-inputs,或CoGroupByKey。
如果您正在查看Cloud Dataflow,那么您可能会考虑使用管道模板ValueProviders,但不确定它们是否在 pyton 或非 Dataflow 运行程序中可用。
| 归档时间: |
|
| 查看次数: |
3806 次 |
| 最近记录: |