我正在尝试使用环境变量将 DB 参数传递给 BashOperator,但我找不到任何文档/示例如何使用 Jinja 模板中的连接。
所以我正在寻找类似于变量的东西
echo {{ var.value.<variable_name> }}
Run Code Online (Sandbox Code Playgroud)
我的PR添加了该{{ conn.my_conn_id.login }}
语法,它将在 Airflow 2.2.0 中可用(截至 2021 年 9 月 22 日尚未发布)。
请参阅此处未发布的文档以获取模板参考
对于 2.1.4 及更早版本:
改进以前的答案,
{{conn.<conn_id>}}
您可以 conn.<connection_name>.host
使用以下宏来获取语法:
class ConnectionGrabber:
def __getattr__(self, name):
return Connection.get_connection_from_secrets(name)
dag = DAG(user_defined_macros={'connection': ConnectionGrabber()}
Run Code Online (Sandbox Code Playgroud)
将名称注入connection
jinja 模板上下文的connection
就是一个ConnectionGrabber
实例。这ConnectionGrabber
提供了动态/托管属性,因此当您请求属性my_conn_id
(如 connection.my_conn_id
)时,它将使用 执行查找airflow.models.Connection.get_connection_from_secrets
并返回该属性,从那里您可以使用、、等a.m.Connection
属性。host
login
password
mssql
在 jinja 模板中使用以下命令访问连接的完整示例bash_command='echo {{connection.mssql.host }}'
:
from airflow.models import DAG,Connection
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
class ConnectionGrabber:
def __getattr__(self, name):
return Connection.get_connection_from_secrets(name)
args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
dag = DAG(
dag_id='test_connection',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
user_defined_macros={'connection': ConnectionGrabber()}
)
task = BashOperator(task_id='read_connection', bash_command='echo {{connection.mssql.host }}', dag=dag)
Run Code Online (Sandbox Code Playgroud)
{{macros.conn.value.<conn_id>}}
如果您想在所有 DAG 中使用此宏,您可以将其包装在插件中,如下所示:
# $AIRFLOW_HOME/plugins/connection_macro.py
from airflow.plugins_manager import AirflowPlugin
from airflow.models import Connection
class ConnectionGrabber:
__name__ = "value"
def __str__(self):
return self.__name__
def __getattr__(self, name):
return Connection.get_connection_from_secrets(name)
class MacrosPlugin(AirflowPlugin):
name = "conn"
macros = [ConnectionGrabber()]
Run Code Online (Sandbox Code Playgroud)
检查插件是否可以加载airflow plugins
airflow plugins
name | source | macros
=====+===========================+=======
conn | $PLUGINS_FOLDER/macros.py | value
Run Code Online (Sandbox Code Playgroud)
{{ macros.conn.value.<conn_id>.host }}
然后你可以像这样在 jinja 模板中使用
task = BashOperator(task_id='read_connection', bash_command='echo macros.conn.value.mssql.host = {{macros.conn.value.mssql.host }}', dag=dag)
Run Code Online (Sandbox Code Playgroud)
我还提出了一个问题conn.value.<conn_id>
来本地添加语法
对于气流 >= 2.2.0:
假设您有 conn id,test_conn
您可以通过以下方式直接使用宏:
{{ conn.test_conn }}
所以你得到任何连接属性,如:
{{ conn.test_conn.host }}
, {{ conn.test_conn.login }}
,{{ conn.test_conn.password }}
等等。
对于气流 < 2.2.0:
没有现成的宏,但是您可以创建自定义宏来解决这个问题。
连接示例:
创建宏:
def get_host(conn_id):
connection = BaseHook.get_connection(conn_id)
return connection.host
def get_schema(conn_id):
connection = BaseHook.get_connection(conn_id)
return connection.schema
def get_login(conn_id):
connection = BaseHook.get_connection(conn_id)
return connection.login
Run Code Online (Sandbox Code Playgroud)
在 DAG 中使用它们:
def print_function(**context):
print(f"host={context['host']} schema={context['schema']} login={context['login']}")
user_macros = {
'get_host': get_host,
'get_schema': get_schema,
'get_login': get_login,
}
with DAG(
dag_id='connection',
default_args=default_args,
schedule_interval=None,
user_defined_macros=user_macros,
) as dag:
# Example how to use as function
python_op = PythonOperator(
task_id='python_task',
provide_context=True,
python_callable=print_function,
op_kwargs={
'host': get_host("test_conn"),
'schema': get_schema("test_conn"),
'login': get_login("test_conn"),
}
)
# Example how to use as Jinja string
bash_op = BashOperator(
task_id='bash_task',
bash_command='echo {{ get_host("test_conn") }} {{ get_schema("test_conn") }} {{ get_login("test_conn") }} ',
)
Run Code Online (Sandbox Code Playgroud)
渲染BashOperator
示例:
一般说明:此代码的作用是创建一个自定义函数func()
以供使用,user_defined_macros
从而提供使用它的能力,就像 Airflow 本身定义的这个宏一样。您可以通过以下方式访问模板:{{ func() }}
如示例中所示,该函数允许接受参数。
请注意,您可以为连接对象中的所有字段创建此类函数。
请谨慎使用它,将密码作为文本传递可能不是一个好主意。
归档时间: |
|
查看次数: |
1023 次 |
最近记录: |