我们使用气流作为调度程序.我想在DAG中调用一个简单的bash运算符.bash脚本需要使用密码作为参数来进行进一步处理.
如何在气流(config/variables/connection)中安全地存储密码并在dag定义文件中访问它.
我是气流和Python的新手,所以我们将非常感谢代码片段.
Dan*_*Lee 34
您可以将密码存储在挂钩中 - 只要您设置了您的fernet密钥,就会对其进行加密.
以下是如何创建连接的方法.
from airflow.models import Connection
def create_conn(username, password, host=None):
new_conn = Connection(conn_id=f'{username}_connection',
login=username,
host=host if host else None)
new_conn.set_password(password)
Run Code Online (Sandbox Code Playgroud)
然后,在您设置的数据库中加密此密码.
要访问此密码:
from airflow.hooks.base_hook import BaseHook
connection = BaseHook.get_connection("username_connection")
password = connection.password # This is a getter that returns the unencrypted password.
Run Code Online (Sandbox Code Playgroud)
编辑:
有一种更简单的方法可以通过UI创建连接:
小智 14
这个答案可能有点晚了,但我认为它很重要并且仍然是最新的:
使用时
BaseHook.get_hook(conn_id=conn_id)
Run Code Online (Sandbox Code Playgroud)
Airflow(我们在版本 2.2.3 中观察到)将凭据以纯文本形式记录到路径下的日志文件中
/var/log/airflow/scheduler/<date>/<dag>/
您肯定不希望在那里输入登录名和密码。
为了避免这种情况,请使用get_connection_from_secrets
如下所示:
from airflow.models import Connection
Connection.get_connection_from_secrets("<connection>")
Run Code Online (Sandbox Code Playgroud)
这不会将任何凭据记录到文件中。
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('bigquery_connection')
print(conn.get_extra())
Run Code Online (Sandbox Code Playgroud)
这些conn.get_extra()
将为您提供存储在连接中的设置的 JSON。
您可以将密码存储在气流变量中,https://airflow.incubator.apache.org/ui.html#variable-view
from airflow.models import Variable
command = """
echo "{{ params.my_param }}"
"""
task = BashOperator(
task_id='templated',
bash_command=command,
params={'my_param': MyPass},
dag=dag)
Run Code Online (Sandbox Code Playgroud)
使用管理/连接选项卡中的 GUI。
真正有效的答案是通过编程方式在 Airflow 中保留连接,其工作原理如下面的代码片段所示。
下面的示例myservice
代表一些外部凭证缓存。
使用以下方法时,您可以将外部管理的连接存储在气流内部。无需从每个 dag/任务中轮询服务。相反,您可以依赖 Airflow 的连接机制,并且不必失去 Airflow 公开的 Operator(如果您的组织允许这样做)。
技巧是用来airflow.utils.db.merge_conn
处理创建的连接对象的设置。
from airflow.utils.db import provide_session, merge_conn
creds = {"user": myservice.get_user(), "pwd": myservice.get_pwd()
c = Connection(conn_id=f'your_airflow_connection_id_here',
login=creds["user"],
host=None)
c.set_password(creds["pwd"])
merge_conn(c)
Run Code Online (Sandbox Code Playgroud)
merge_conn 是内置的,由气流本身用来初始化空连接。但它不会自动更新。为此,您必须使用自己的辅助函数。
from airflow.utils.db import provide_session
@provide_session
def store_conn(conn, session=None):
from airflow.models import Connection
if session.query(Connection).filter(Connection.conn_id == conn.conn_id).first():
logging.info("Connection object already exists, attempting to remove it...")
session.delete(session.query(Connection).filter(Connection.conn_id == conn.conn_id).first())
session.add(conn)
session.commit()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
12515 次 |
最近记录: |