Airflow:如何在函数中使用触发参数

a54*_*54i 8 python airflow

我们正在使用 AirflowKubernetesPodOperator作为我们的数据管道。我们想要添加的是通过 UI 传递参数的选项。

目前,我们使用它的方式是使用不同的 yaml 文件来存储运算符的参数,我们不是直接调用运算符,而是调用一个函数来进行一些准备并返回运算符,如下所示:

def prep_kubernetes_pod_operator(yaml):

    # ... read yaml and extract params

    return KubernetesPodOperator(params)

with DAG(...):
    
    task1 = prep_kubernetes_pod_operator(yaml)

Run Code Online (Sandbox Code Playgroud)

对于我们来说,这很有效,我们可以保持 dag 文件相当轻量,但是现在我们想添加可以通过 UI 添加一些额外参数的功能。我知道触发器参数可以通过 访问kwargs['dag_run'].conf,但我没有成功将它们拉入 Python 函数。

我尝试的另一件事是创建一个自定义运算符,因为它可以识别参数,但我无法设法KubernetesPodOperator在执行部分中调用(而且我猜在运算符中调用运算符无论如何都不是正确的解决方案)。

更新:

按照 NicoE 的建议,我开始扩展KubernetesPodOperator

我现在遇到的错误是,当我解析 yaml 并分配参数时,父参数变成元组并引发类型错误。

达格:

task = NewKPO(
    task_id="task1",
    yaml_path=yaml_path)
Run Code Online (Sandbox Code Playgroud)

操作员:

class NewKPO(KubernetesPodOperator):
   @apply_defaults
   def __init__(
           self,
           yaml_path: str,
           name: str = "default",
           *args,
           **kwargs) -> None:
       self.yaml_path = yaml_path
       self.name = name
       super(NewKPO, self).__init__(
           name=name, # DAG is not parsed without this line - 'key has to be string'
           *args,
           **kwargs)

   def execute(self, context):
       # parsing yaml and adding context["dag_run"].conf (...)
       self.name = yaml.name
       self.image = yaml.image
       self.secrets = yaml.secrets
       #(...) if i run a type(self.secrets) here I will get tuple
       return super(NewKPO, self).execute(context)
Run Code Online (Sandbox Code Playgroud)

Nic*_*coE 24

您可以使用params,它是一个可以在 DAG 级别参数定义的字典,并且在每个任务中都可以访问。适用于从 UI 派生的每个运算符BaseOperator,也可以从 UI 进行设置。

以下示例展示了如何将其与不同的运算符一起使用。 params可以在default_argsdict 中定义或作为 DAG 对象的 arg 进行定义。

default_args = {
    "owner": "airflow",
    'params': {
        "param1": "first_param",
        "param2": "second_param"
    }
}

dag = DAG(
    dag_id="example_dag_params",
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval="@once",
    tags=['example_dags'],
    catchup=False
)

Run Code Online (Sandbox Code Playgroud)

从 UI 触发此 DAG 时,您可以添加一个额外的参数:

从 UI 触发 DAG 时设置参数

可以在模板化字段中访问参数,如下BashOperator所示:

with dag:

    bash_task = BashOperator(
        task_id='bash_task',
        bash_command='echo bash_task: {{ params.param1 }}')

Run Code Online (Sandbox Code Playgroud)

bash_task日志输出:

{bash.py:158} INFO - Running command: echo bash_task: first_param
{bash.py:169} INFO - Output:
{bash.py:173} INFO - bash_task: first_param
{bash.py:177} INFO - Command exited with return code 0
Run Code Online (Sandbox Code Playgroud)

参数可以在执行上下文中访问,例如 python_callable

{bash.py:158} INFO - Running command: echo bash_task: first_param
{bash.py:169} INFO - Output:
{bash.py:173} INFO - bash_task: first_param
{bash.py:177} INFO - Command exited with return code 0
Run Code Online (Sandbox Code Playgroud)

输出:

{logging_mixin.py:104} INFO - Task_id: python_task
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI
Run Code Online (Sandbox Code Playgroud)

您还可以在任务级别定义中添加参数:


    def _print_params(**kwargs):
        print(f"Task_id: {kwargs['ti'].task_id}")
        for k, v in kwargs['params'].items():
            print(f"{k}:{v}")

    python_task = PythonOperator(
        task_id='python_task',
        python_callable=_print_params,
    )
Run Code Online (Sandbox Code Playgroud)

输出:

{logging_mixin.py:104} INFO - Task_id: python_task_2
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param4:param defined at task level
{logging_mixin.py:104} INFO - param3:param_from_the_UI
Run Code Online (Sandbox Code Playgroud)

按照示例,您可以定义一个继承自 的自定义 Operator BaseOperator

{logging_mixin.py:104} INFO - Task_id: python_task
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI
Run Code Online (Sandbox Code Playgroud)

一个示例任务是:

    python_task_2 = PythonOperator(
        task_id='python_task_2',
        python_callable=_print_params,
        params={'param4': 'param defined at task level'}
    )
Run Code Online (Sandbox Code Playgroud)

输出:

{logging_mixin.py:104} INFO - Task_id: custom_operator_task
{logging_mixin.py:104} INFO - custom_arg: default
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI
Run Code Online (Sandbox Code Playgroud)

进口:

{logging_mixin.py:104} INFO - Task_id: python_task_2
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param4:param defined at task level
{logging_mixin.py:104} INFO - param3:param_from_the_UI
Run Code Online (Sandbox Code Playgroud)

我希望这对你有用!