在Admin->Connection我设置Conn Type S3.
基本上我在My Python脚本中有这个代码:
if __name__ == '__main__':
AWS_ACCESS_KEY_ID = "..."
AWS_SECRET_ACCESS_KEY = "..."
AWS_DEFAULT_REGION = "..."
Start_Work
Run Code Online (Sandbox Code Playgroud)
我想要做的是从Airflow调用我的脚本并将连接的参数传递给它(而不是在脚本中硬编码).
我怎么做?
如何访问每个提交的数据?
你可以做的一件事就是导入provide_sessionutil然后根据它检索连接conn_id.然后,您可以将其传递给python运算符.
所以它看起来像这样:
from airflow.utils.db import provide_session
@provide_session
def get_conn(conn_id, session=None):
conn = (session.query(Connection)
.filter(Connection.conn_id == conn_id)
.first())
return conn
def my_python_function():
conn = get_conn('connection_id')
key_id = conn.extra_dejson.get('AWS_ACCESS_KEY_ID')
secret_key = conn.extra_dejson.get('AWS_SECRET_ACCESS_KEY')
default_region = conn.extra_dejson.get('DEFAULT_REGION')
task1 = PythonOperator(task_id='my_task', python_callable=my_python_function, dag=dag)
task1
Run Code Online (Sandbox Code Playgroud)
编辑:从python callable中删除了引号
| 归档时间: |
|
| 查看次数: |
1281 次 |
| 最近记录: |