我们使用气流作为调度程序.我想在DAG中调用一个简单的bash运算符.bash脚本需要使用密码作为参数来进行进一步处理.
如何在气流(config/variables/connection)中安全地存储密码并在dag定义文件中访问它.
我是气流和Python的新手,所以我们将非常感谢代码片段.
通过Admin -> Connections,我们可以创建/修改连接的参数,但是我想知道是否可以通过API进行相同的操作,以便可以以编程方式设置连接
airflow.models.Connection似乎它只处理实际连接到实例,而不是将其保存到列表中。似乎应该已经实现了一个功能,但是我不确定在哪里可以找到该特定功能的文档。
我目前正在将之前在bash脚本中实现的工作流转换为Airflow DAG。在bash脚本中,我只是在运行时使用以下命令导出变量
export HADOOP_CONF_DIR="/etc/hadoop/conf"
Run Code Online (Sandbox Code Playgroud)
现在我想在Airflow中做同样的事情,但是还没有找到解决方案。我发现的一种解决方法是在os.environ[VAR_NAME]='some_text'任何方法或运算符外部设置变量,但这意味着在脚本加载后而不是在运行时将它们导出。
现在,当我尝试调用os.environ[VAR_NAME] = 'some_text'由PythonOperator 调用的函数时,它不起作用。我的代码看起来像这样
def set_env():
os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
os.environ['PATH'] = "somePath:" + os.environ['PATH']
os.environ['SPARK_HOME'] = "pathToSparkHome"
os.environ['PYTHONPATH'] = "somePythonPath"
os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()
set_env_operator = PythonOperator(
task_id='set_env_vars_NOT_WORKING',
python_callable=set_env,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
现在,当我的SparkSubmitOperator被执行时,我得到了异常:
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
Run Code Online (Sandbox Code Playgroud)
我使用的情况下,这是相关的是,我有SparkSubmitOperator,我在那里作业提交到纱,因此无论是HADOOP_CONF_DIR或YARN_CONF_DIR必须在环境中进行设置。.bashrc对我来说,在我或其他任何配置中设置它们都是遗憾的,这就是为什么我需要在运行时设置它们。
最好在执行之前SparkSubmitOperator,先在操作员中设置它们,但如果有可能将它们作为参数传递给SparkSubmitOperator,那至少是可以的。
我的Airflow应用程序也在具有IAM角色的AWS EC2实例中运行。目前,我正在使用硬编码访问和密钥创建Airflow S3连接。但是我希望我的应用程序从此实例本身中获取此AWS凭证。
如何实现呢?
我有 Airflow 作业,它们在 EMR 集群上运行良好。我需要的是,假设我有 4 个需要 EMR 集群的气流作业,假设需要 20 分钟才能完成任务。为什么我们不能在 DAG 运行时创建 EMR 集群,一旦作业完成,它将终止创建的 EMR 集群。