相关疑难解决方法(0)

Airflow:如何在函数中使用触发参数

我们正在使用 AirflowKubernetesPodOperator作为我们的数据管道。我们想要添加的是通过 UI 传递参数的选项。

目前,我们使用它的方式是使用不同的 yaml 文件来存储运算符的参数,我们不是直接调用运算符,而是调用一个函数来进行一些准备并返回运算符,如下所示:

def prep_kubernetes_pod_operator(yaml):

    # ... read yaml and extract params

    return KubernetesPodOperator(params)

with DAG(...):
    
    task1 = prep_kubernetes_pod_operator(yaml)

Run Code Online (Sandbox Code Playgroud)

对于我们来说,这很有效,我们可以保持 dag 文件相当轻量,但是现在我们想添加可以通过 UI 添加一些额外参数的功能。我知道触发器参数可以通过 访问kwargs['dag_run'].conf,但我没有成功将它们拉入 Python 函数。

我尝试的另一件事是创建一个自定义运算符,因为它可以识别参数,但我无法设法KubernetesPodOperator在执行部分中调用(而且我猜在运算符中调用运算符无论如何都不是正确的解决方案)。

更新:

按照 NicoE 的建议,我开始扩展KubernetesPodOperator

我现在遇到的错误是,当我解析 yaml 并分配参数时,父参数变成元组并引发类型错误。

达格:

task = NewKPO(
    task_id="task1",
    yaml_path=yaml_path)
Run Code Online (Sandbox Code Playgroud)

操作员:

class NewKPO(KubernetesPodOperator):
   @apply_defaults
   def __init__(
           self,
           yaml_path: str,
           name: str = "default",
           *args,
           **kwargs) -> None:
       self.yaml_path = yaml_path
       self.name = name
       super(NewKPO, self).__init__(
           name=name, # …
Run Code Online (Sandbox Code Playgroud)

python airflow

8
推荐指数
1
解决办法
8015
查看次数

对于Apache Airflow,如何通过CLI手动触发DAG时如何传递参数?

我使用Airflow来管理ETL任务的执行和计划。已创建DAG,并且工作正常。但是通过cli手动触发dag时可以传递参数。

例如:我的DAG每天在01:30运行,并处理昨天的数据(时间范围从昨天的01:30到今天的01:30)。数据源可能存在一些问题。我需要重新处理这些数据(手动指定时间范围)。

因此,我可以在预定的时候创建这样的气流DAG,使其默认时间范围为昨天的01:30到今天的01:30。然后,如果数据源有任何问题,我需要手动触发DAG并手动将时间范围作为参数传递。

据我所知airflow test-tp可以通过PARAMS的任务。但这仅用于测试特定任务。并且airflow trigger_dag没有-tp选择。那么有没有办法将tigger_dag传递给DAG,然后操作员可以读取这些参数?

谢谢!

airflow

4
推荐指数
1
解决办法
5643
查看次数

标签 统计

airflow ×2

python ×1