我们正在使用 AirflowKubernetesPodOperator作为我们的数据管道。我们想要添加的是通过 UI 传递参数的选项。
目前,我们使用它的方式是使用不同的 yaml 文件来存储运算符的参数,我们不是直接调用运算符,而是调用一个函数来进行一些准备并返回运算符,如下所示:
def prep_kubernetes_pod_operator(yaml):
# ... read yaml and extract params
return KubernetesPodOperator(params)
with DAG(...):
task1 = prep_kubernetes_pod_operator(yaml)
Run Code Online (Sandbox Code Playgroud)
对于我们来说,这很有效,我们可以保持 dag 文件相当轻量,但是现在我们想添加可以通过 UI 添加一些额外参数的功能。我知道触发器参数可以通过 访问kwargs['dag_run'].conf,但我没有成功将它们拉入 Python 函数。
我尝试的另一件事是创建一个自定义运算符,因为它可以识别参数,但我无法设法KubernetesPodOperator在执行部分中调用(而且我猜在运算符中调用运算符无论如何都不是正确的解决方案)。
更新:
按照 NicoE 的建议,我开始扩展KubernetesPodOperator。
我现在遇到的错误是,当我解析 yaml 并分配参数时,父参数变成元组并引发类型错误。
达格:
task = NewKPO(
task_id="task1",
yaml_path=yaml_path)
Run Code Online (Sandbox Code Playgroud)
操作员:
class NewKPO(KubernetesPodOperator):
@apply_defaults
def __init__(
self,
yaml_path: str,
name: str = "default",
*args,
**kwargs) -> None:
self.yaml_path = yaml_path
self.name = name
super(NewKPO, self).__init__(
name=name, # …Run Code Online (Sandbox Code Playgroud)