如何在 GCP Cloud Composer 上的 Apache Airflow 上使用带有“KubernetesPodOperator”的连接钩子作为环境变量

wab*_*wab 7 kubernetes airflow google-cloud-composer kubernetes-operator

我想使用保存在airflow使用 KubernetesPodOperator.

在开发映像时,我使用环境变量将数据库连接信息传递给容器,但生产环境将数据库保存为连接挂钩。

提取数据库连接信息并将其传递给容器的最佳方法是什么?

env_vars = {'database_usr': 'xxx', 'database_pas': 'xxx'}
Run Code Online (Sandbox Code Playgroud)
KubernetesPodOperator(
        dag=dag,
        task_id="example-task",
        name="example-task",
        namespace="default",
        image="eu.gcr.io/repo/image:tag",
        image_pull_policy="Always",
        arguments=["-v", "image-command", "image-arg"],
        env_vars=env_vars,
    )
Run Code Online (Sandbox Code Playgroud)

wab*_*wab 3

我当前的解决方案是使用以下命令从连接中获取变量BaseHook

from airflow.hooks.base_hook import BaseHook


def connection_to_dict(connection_id):
    """Returns connection params from Airflow as a dictionary.

    Parameters
    ----------
    connection_id : str
        Name of the connection in Airflow, e.g. `mysql_default`

    Returns
    -------
    dict
        Unencrypted values.
    """
    conn_obj = BaseHook.get_connection(connection_id)
    d = conn_obj.__dict__
    if ('is_encrypted', True) in d.items():
        d['password'] = conn_obj.get_password()
    return d
Run Code Online (Sandbox Code Playgroud)

然后将它们作为环境变量传递给 Kubernetes pod 操作员。