使用Apache airflow存储和访问密码

Anu*_*nup 20 python airflow

我们使用气流作为调度程序.我想在DAG中调用一个简单的bash运算符.bash脚本需要使用密码作为参数来进行进一步处理.

如何在气流(config/variables/connection)中安全地存储密码并在dag定义文件中访问它.

我是气流和Python的新手,所以我们将非常感谢代码片段.

Dan*_*Lee 34

您可以将密码存储在挂钩中 - 只要您设置了您的fernet密钥,就会对其进行加密.

以下是如何创建连接的方法.

from airflow.models import Connection
def create_conn(username, password, host=None):
    new_conn = Connection(conn_id=f'{username}_connection',
                                  login=username,
                                  host=host if host else None)
    new_conn.set_password(password)
Run Code Online (Sandbox Code Playgroud)

然后,在您设置的数据库中加密此密码.

要访问此密码:

from airflow.hooks.base_hook import BaseHook

 connection = BaseHook.get_connection("username_connection")
 password = connection.password # This is a getter that returns the unencrypted password.
Run Code Online (Sandbox Code Playgroud)

编辑:

有一种更简单的方法可以通过UI创建连接:

主菜单 然后: 创建连接

  • 这是创建postgres连接字符串的方法。`connection_string ='postgresql://'+ str(连接。登录)+':'+ str(连接。密码)+'@'\ + str(连接。主机)+':'+ str(连接。端口) .encode('utf-8')+'/'+ str(connection.schema)` (2认同)

小智 14

这个答案可能有点晚了,但我认为它很重要并且仍然是最新的:

使用时

BaseHook.get_hook(conn_id=conn_id)
Run Code Online (Sandbox Code Playgroud)

Airflow(我们在版本 2.2.3 中观察到)将凭据以纯文本形式记录到路径下的日志文件中 /var/log/airflow/scheduler/<date>/<dag>/

您肯定不希望在那里输入登录名和密码。

为了避免这种情况,请使用get_connection_from_secrets如下所示:

from airflow.models import Connection
Connection.get_connection_from_secrets("<connection>")
Run Code Online (Sandbox Code Playgroud)

这不会将任何凭据记录到文件中。


Nik*_*dij 7

from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('bigquery_connection')
print(conn.get_extra())
Run Code Online (Sandbox Code Playgroud)

这些conn.get_extra()将为您提供存储在连接中的设置的 JSON。


Che*_*zhi 5

您可以将密码存储在气流变量中,https://airflow.incubator.apache.org/ui.html#variable-view

  1. 在 UI 中创建一个带有 key&value 的变量,例如 mypass:XXX
  2. 导入变量 from airflow.models import Variable
  3. MyPass = Variable.get("mypass")
  4. 将 MyPass 传递给您的 bash 脚本:
command = """
          echo "{{ params.my_param }}"
          """



task = BashOperator(
        task_id='templated',
        bash_command=command,
        params={'my_param': MyPass},
        dag=dag)
Run Code Online (Sandbox Code Playgroud)

  • @MatthijsBrouns 的评论不再正确(对于 Airflow 1.9+)。变量存储在加密的 DB 中。但是,请注意,它们的值在 Airflow Web UI 中以纯文本形式显示,如果您有机会在不应看到该值的人面前浏览那里。 (7认同)

Hav*_*nar 5

使用管理/连接选项卡中的 GUI。

真正有效的答案是通过编程方式在 Airflow 中保留连接,其工作原理如下面的代码片段所示。

下面的示例myservice代表一些外部凭证缓存。

使用以下方法时,您可以将外部管理的连接存储在气流内部。无需从每个 dag/任务中轮询服务。相反,您可以依赖 Airflow 的连接机制,并且不必失去 Airflow 公开的 Operator(如果您的组织允许这样做)。

技巧是用来airflow.utils.db.merge_conn处理创建的连接对象的设置。

    from airflow.utils.db import provide_session, merge_conn




    creds = {"user": myservice.get_user(), "pwd": myservice.get_pwd() 

    c = Connection(conn_id=f'your_airflow_connection_id_here',
                   login=creds["user"],
                   host=None)
    c.set_password(creds["pwd"])
    merge_conn(c)
Run Code Online (Sandbox Code Playgroud)

merge_conn 是内置的,由气流本身用来初始化空连接。但它不会自动更新。为此,您必须使用自己的辅助函数。

from airflow.utils.db import provide_session

@provide_session
def store_conn(conn, session=None):
    from airflow.models import Connection
    if session.query(Connection).filter(Connection.conn_id == conn.conn_id).first():
        logging.info("Connection object already exists, attempting to remove it...")
        session.delete(session.query(Connection).filter(Connection.conn_id == conn.conn_id).first())

    session.add(conn)
    session.commit()
Run Code Online (Sandbox Code Playgroud)