在运行时在 Airflow 运算符中创建和使用连接

y2k*_*ham 3 airflow

注意:这不是重复的


我必须从我的Airflow DAG. 实现这一目标的直接方法是SSHHook.

问题在于远程系统是一个EMR集群,它本身是在运行时(由上游任务)使用EmrCreateJobFlowOperator. 因此,虽然我可以掌握job_flow_id已启动的 EMR 集群(usingXCOM),但我需要的是将 anssh_conn_id传递给每个下游任务。


查看文档代码,很明显 Airflow 将尝试conn_iddb环境变量中查找此连接(使用),因此现在问题归结为能够在运行时设置这两个属性中的任何一个(从内部一个operator)。

这似乎是一个相当普遍的问题,因为如果无法实现,那么 的效用EmrCreateJobFlowOperator将受到严重阻碍;但我还没有遇到任何证明它的例子。


  • 是否可以从 Airflow 操作员内部创建(并销毁)其中任何一个?
    1. 连接(保留在 Airflow 的数据库中)
    2. 环境变量(应该可供所有下游任务访问,而不仅仅是此处所述的当前任务)
  • 如果没有,我有什么选择?

我上线了

  • Airflow v1.10
  • Python 3.6.6
  • emr-5.15 (如果需要可以升级)

vil*_*asv 8

连接来自 ORM

是的,如果您足够小心,您可以在运行时创建连接,甚至在 DAG 创建时也是如此。Airflow 在其内部模型上是完全透明的,因此您可以直接与底层 SqlAlchemy 交互。正如本答案中最初举例说明的那样,它很简单:

from airflow.models import Connection
from airflow import settings

def create_conn(username, password, host=None):
    new_conn = Connection(conn_id=f'{username}_connection',
                                  login=username,
                                  host=host if host else None)
    new_conn.set_password(password)

    session = settings.Session()
    session.add(new_conn)
    session.commit()
Run Code Online (Sandbox Code Playgroud)

当然,您可以在其中与 EMR 连接可能需要的任何其他额外连接属性进行交互。

环境受过程限制

这不是 Airflow 或 Python 的限制,但(AFAIK 适用于每个主要操作系统)环境与进程的生命周期有关。export例如,当您在 bash 中创建一个变量时,您只是说明当您生成子进程时,您希望将该变量复制到子进程的环境中。这意味着父进程在创建后不能改变子进程的环境,子进程也不能改变父进程。

简而言之,只有流程本身在创建后才能改变其环境。考虑到工作进程是 Airflow 子进程,也很难控制其环境的创建。您可以做的是将环境变量写入文件,并在每个任务开始时有意地使用该文件中的覆盖更新当前环境。