注意:这不是重复的
我必须从我的Airflow DAG. 实现这一目标的直接方法是SSHHook.
问题在于远程系统是一个EMR集群,它本身是在运行时(由上游任务)使用EmrCreateJobFlowOperator. 因此,虽然我可以掌握job_flow_id已启动的 EMR 集群(usingXCOM),但我需要的是将 anssh_conn_id传递给每个下游任务。
查看文档和代码,很明显 Airflow 将尝试conn_id在db和环境变量中查找此连接(使用),因此现在问题归结为能够在运行时设置这两个属性中的任何一个(从内部一个operator)。
这似乎是一个相当普遍的问题,因为如果无法实现,那么 的效用EmrCreateJobFlowOperator将受到严重阻碍;但我还没有遇到任何证明它的例子。
我上线了
Airflow v1.10Python 3.6.6emr-5.15 (如果需要可以升级)是的,如果您足够小心,您可以在运行时创建连接,甚至在 DAG 创建时也是如此。Airflow 在其内部模型上是完全透明的,因此您可以直接与底层 SqlAlchemy 交互。正如本答案中最初举例说明的那样,它很简单:
from airflow.models import Connection
from airflow import settings
def create_conn(username, password, host=None):
new_conn = Connection(conn_id=f'{username}_connection',
login=username,
host=host if host else None)
new_conn.set_password(password)
session = settings.Session()
session.add(new_conn)
session.commit()
Run Code Online (Sandbox Code Playgroud)
当然,您可以在其中与 EMR 连接可能需要的任何其他额外连接属性进行交互。
这不是 Airflow 或 Python 的限制,但(AFAIK 适用于每个主要操作系统)环境与进程的生命周期有关。export例如,当您在 bash 中创建一个变量时,您只是说明当您生成子进程时,您希望将该变量复制到子进程的环境中。这意味着父进程在创建后不能改变子进程的环境,子进程也不能改变父进程。
简而言之,只有流程本身在创建后才能改变其环境。考虑到工作进程是 Airflow 子进程,也很难控制其环境的创建。您可以做的是将环境变量写入文件,并在每个任务开始时有意地使用该文件中的覆盖更新当前环境。
| 归档时间: |
|
| 查看次数: |
9483 次 |
| 最近记录: |