将 Airflow 中操作员参数中的 jinja 模板格式化为 INT

Gab*_*abe 3 jinja2 airflow

我正在尝试将 jinja 模板参数格式化为整数,以便我可以将其传递给需要 INT (可以是自定义或 PythonOperator)的运算符,但我无法这样做。

请参阅下面的示例 DAG。我正在使用内置的 Jinja 过滤器| int,但这不起作用 - 类型仍然存在<class 'str'>

我对 Airflow 还很陌生,但根据我读到的有关 Jinja/Airflow 作品的内容,我认为这是不可能的。我看到两个主要的解决方法:

  • 将运算符参数更改为期望字符串并处理下面的转换。
  • 在单独的PythonOperator中处理此转换,该 PythonOperator 将字符串转换为 int 并使用 xcom/task 上下文导出。(我认为这会起作用但不确定)

请让我知道任何其他解决方法

def greet(mystr):
    print (mystr)
    print(type(mystr))

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2)        
}

dag = DAG(
    'template_dag',
    default_args=default_args,
    description='template',
    schedule_interval='0 13 * * *'
)


with dag:

    # foo = "{{ var.value.my_custom_var | int }}"  # from variable
    foo = "{{ execution_date.int_timestamp | int }}"  # built in macro

    # could be MyCustomOperator
    opr_greet = PythonOperator(task_id='greet',
                               python_callable=greet,
                               op_kwargs={'mystr': foo}
                               )

    opr_greet
Run Code Online (Sandbox Code Playgroud)

气流 11.10.11

Gab*_*abe 10

更新的答案:

从 Airflow 2.1 开始,您可以传递render_template_as_native_obj=True给 dag,Airflow 将返回 Python 类型(dict、int 等)而不是字符串。请参阅此拉取请求

dag = DAG(
    dag_id="example_template_as_python_object",
    schedule_interval=None,
    start_date=days_ago(2),
    render_template_as_native_obj=True,
)
Run Code Online (Sandbox Code Playgroud)

先前版本的旧答案:

我发现一个相关的问题提供了最佳的解决方法,IMO。

Airflow xcom pull 仅返回字符串

技巧是使用 PythonOperator,在那里进行数据类型转换,然后使用参数调用主运算符。下面是将 json 字符串转换为 dict 对象的示例。同样适用于将 string 转换为 int 等。

def my_func(ds, **kwargs):
    ti = kwargs['ti']
    body = ti.xcom_pull(task_ids='privious_task_id')
    import_body= json.loads(body)
    op = CloudSqlInstanceImportOperator(
            project_id=GCP_PROJECT_ID,
            body= import_body,
            instance=INSTANCE_NAME,
            gcp_conn_id='postgres_default',
            task_id='sql_import_task',
            validate_body=True,
        )
    op.execute()
    

p = PythonOperator(task_id='python_task', python_callable=my_func)
Run Code Online (Sandbox Code Playgroud)