conn_id 未定义

Sim*_*ton 5 airflow

我正在学习 Airflow,并试图了解连接是如何工作的。

我有一个带有以下代码的第一个 dag:

c = Connection(
    conn_id='aws_credentials',
    conn_type='Amazon Web Services',
    login='xxxxxxxx',
    password='xxxxxxxxx'
)


def list_keys():
    hook = S3Hook(aws_conn_id=c.conn_id)
    logging.info(f"Listing Keys from {bucket}/{prefix}")
    keys = hook.list_keys(bucket, prefix=prefix)
    for key in keys:
        logging.info(f"- s3://{bucket}/{key}")
Run Code Online (Sandbox Code Playgroud)

在这种情况下它工作正常。连接已顺利传递至 S3Hook。

然后我有第二个DAG:

redshift_connection = Connection(
    conn_id='redshift',
    conn_type='postgres',
    login='duser',
    password='xxxxxxxxxx',
    host='xxxxxxxx.us-west-2.redshift.amazonaws.com',
    port=5439,
    schema='db'
)

aws_connection = Connection(
    conn_id='aws_credentials',
    conn_type='Amazon Web Services',
    login='xxxxxxxxx',
    password='xxxxxxxx'
)

def load_data_to_redshift(*args, **kwargs):
    aws_hook = AwsHook(aws_connection.conn_id)
    credentials = aws_hook.get_credentials()
    redshift_hook = PostgresHook(redshift_connection.conn_id)
    sql_stmnt = sql_statements.COPY_STATIONS_SQL.format(aws_connection.login, aws_connection.password)
    redshift_hook.run(sql_stmnt)

dag = DAG(
    's3_to_Redshift',
    start_date=datetime.datetime.now()
    )

create_table = PostgresOperator(
    task_id='create_table',
    postgres_conn_id=redshift_connection.conn_id,
    sql=sql_statements.CREATE_STATIONS_TABLE_SQL,
    dag=dag
    )
Run Code Online (Sandbox Code Playgroud)

该 dag 返回以下错误:The conn_idredshiftisn't defined

这是为什么?我的第一条和第二条有什么区别?为什么连接似乎在第一个示例中有效,而在第二种情况下无效?

谢谢。

Ser*_*kov 9

连接通常使用 UI 或 CLI 创建,如此处所述并由 Airflow 存储在数据库后端。然后,操作符和相应的挂钩将连接 ID 作为参数,并使用它来检索这些连接的用户名、密码等。

aws_credentials就您而言,我怀疑您使用 UI 或 CLI创建了与 ID 的连接。因此,当您将其 ID 传递给S3Hook它时,它会成功检索凭据(从 Airflow 的数据库后端,而不是从Connection您创建的对象)。

但是,您没有创建与 ID 的连接redshift,因此,AwsHook抱怨它未定义。您必须首先按照文档中的描述创建连接。

注意:在DAG​​代码中不定义连接的原因是DAG代码通常存储在版本控制系统(例如Git)中。在那里存储凭证会存在安全风险。