Apache AIRFLOW - 如何将参数发送到Python脚本

jac*_*ack 2 python airflow

Admin->Connection我设置Conn Type S3.

基本上我在My Python脚本中有这个代码:

if __name__ == '__main__':
    AWS_ACCESS_KEY_ID = "..."
    AWS_SECRET_ACCESS_KEY = "..."
    AWS_DEFAULT_REGION = "..."
    Start_Work
Run Code Online (Sandbox Code Playgroud)

我想要做的是从Airflow调用我的脚本并将连接的参数传递给它(而不是在脚本中硬编码).

我怎么做?

编辑:让我们假设这是连接: 在此输入图像描述

如何访问每个提交的数据?

Jos*_*ini 5

你可以做的一件事就是导入provide_sessionutil然后根据它检索连接conn_id.然后,您可以将其传递给python运算符.

所以它看起来像这样:

from airflow.utils.db import provide_session

@provide_session
def get_conn(conn_id, session=None):
    conn = (session.query(Connection)
                   .filter(Connection.conn_id == conn_id)
                   .first())
    return conn

def my_python_function():

   conn = get_conn('connection_id')

   key_id = conn.extra_dejson.get('AWS_ACCESS_KEY_ID')
   secret_key = conn.extra_dejson.get('AWS_SECRET_ACCESS_KEY')
   default_region = conn.extra_dejson.get('DEFAULT_REGION')

task1 = PythonOperator(task_id='my_task', python_callable=my_python_function, dag=dag)

task1
Run Code Online (Sandbox Code Playgroud)

编辑:从python callable中删除了引号