使用 jinja 模板中的气流连接

Mik*_*rev 4 airflow

我正在尝试使用环境变量将 DB 参数传递给 BashOperator,但我找不到任何文档/示例如何使用 Jinja 模板中的连接。

所以我正在寻找类似于变量的东西

echo {{ var.value.<variable_name> }}
Run Code Online (Sandbox Code Playgroud)

ece*_*ulm 7

我的PR添加了该{{ conn.my_conn_id.login }}语法,它将在 Airflow 2.2.0 中可用(截至 2021 年 9 月 22 日尚未发布)。

请参阅此处未发布的文档以获取模板参考

对于 2.1.4 及更早版本:

改进以前的答案,

为每个 DAG 定义宏:{{conn.<conn_id>}}

您可以 conn.<connection_name>.host使用以下宏来获取语法:

class ConnectionGrabber:
    def __getattr__(self, name):
        return  Connection.get_connection_from_secrets(name)
dag = DAG(user_defined_macros={'connection': ConnectionGrabber()}

Run Code Online (Sandbox Code Playgroud)

将名称注入connectionjinja 模板上下文的connection就是一个ConnectionGrabber实例。这ConnectionGrabber提供了动态/托管属性,因此当您请求属性my_conn_id(如 connection.my_conn_id)时,它将使用 执行查找airflow.models.Connection.get_connection_from_secrets并返回该属性,从那里您可以使用、、等a.m.Connection属性。hostloginpassword

mssql在 jinja 模板中使用以下命令访问连接的完整示例bash_command='echo {{connection.mssql.host }}'

from airflow.models import DAG,Connection
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago


class ConnectionGrabber:
    def __getattr__(self, name):
        return  Connection.get_connection_from_secrets(name)


args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}

dag = DAG(
    dag_id='test_connection',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
    user_defined_macros={'connection': ConnectionGrabber()}
)

task = BashOperator(task_id='read_connection', bash_command='echo {{connection.mssql.host }}', dag=dag)
Run Code Online (Sandbox Code Playgroud)

在插件中定义宏:{{macros.conn.value.<conn_id>}}

如果您想在所有 DAG 中使用此宏,您可以将其包装在插件中,如下所示:

# $AIRFLOW_HOME/plugins/connection_macro.py
from airflow.plugins_manager import AirflowPlugin
from airflow.models import Connection


class ConnectionGrabber:
    __name__ = "value"
    def __str__(self):
        return self.__name__
    def __getattr__(self, name):
        return  Connection.get_connection_from_secrets(name)

class MacrosPlugin(AirflowPlugin):
    name = "conn"
    macros = [ConnectionGrabber()]
Run Code Online (Sandbox Code Playgroud)

检查插件是否可以加载airflow plugins

airflow plugins
name | source                    | macros
=====+===========================+=======
conn | $PLUGINS_FOLDER/macros.py | value
Run Code Online (Sandbox Code Playgroud)

{{ macros.conn.value.<conn_id>.host }}然后你可以像这样在 jinja 模板中使用

   task = BashOperator(task_id='read_connection', bash_command='echo macros.conn.value.mssql.host = {{macros.conn.value.mssql.host }}', dag=dag)
Run Code Online (Sandbox Code Playgroud)

我还提出了一个问题conn.value.<conn_id>来本地添加语法


Ela*_*lad 6

对于气流 >= 2.2.0

假设您有 conn id,test_conn您可以通过以下方式直接使用宏:

{{ conn.test_conn }} 所以你得到任何连接属性,如:

{{ conn.test_conn.host }}, {{ conn.test_conn.login }},{{ conn.test_conn.password }}等等。

对于气流 < 2.2.0

没有现成的宏,但是您可以创建自定义宏来解决这个问题。

连接示例:

在此处输入图片说明

创建宏:

def get_host(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.host

def get_schema(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.schema

def get_login(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.login
Run Code Online (Sandbox Code Playgroud)

在 DAG 中使用它们:

def print_function(**context):
    print(f"host={context['host']} schema={context['schema']} login={context['login']}")

user_macros = {
    'get_host': get_host,
    'get_schema': get_schema,
    'get_login': get_login,
}

with DAG(
    dag_id='connection',
    default_args=default_args,
    schedule_interval=None,
    user_defined_macros=user_macros,
) as dag:

# Example how to use as function
python_op = PythonOperator( 
    task_id='python_task',
    provide_context=True,
    python_callable=print_function,
    op_kwargs={
        'host': get_host("test_conn"),
        'schema': get_schema("test_conn"),
        'login': get_login("test_conn"),
    }
)

# Example how to use as Jinja string
bash_op = BashOperator( 
    task_id='bash_task',
    bash_command='echo {{ get_host("test_conn") }} {{ get_schema("test_conn") }} {{ get_login("test_conn") }} ',
)
Run Code Online (Sandbox Code Playgroud)

渲染PythonOperator示例: 在此处输入图片说明

在此处输入图片说明

渲染BashOperator示例:

在此处输入图片说明

一般说明:此代码的作用是创建一个自定义函数func()以供使用,user_defined_macros从而提供使用它的能力,就像 Airflow 本身定义的这个宏一样。您可以通过以下方式访问模板:{{ func() }}如示例中所示,该函数允许接受参数。

请注意,您可以为连接对象中的所有字段创建此类函数。

请谨慎使用它,将密码作为文本传递可能不是一个好主意。