我正在学习 Airflow,并试图了解连接是如何工作的。
我有一个带有以下代码的第一个 dag:
c = Connection(
conn_id='aws_credentials',
conn_type='Amazon Web Services',
login='xxxxxxxx',
password='xxxxxxxxx'
)
def list_keys():
hook = S3Hook(aws_conn_id=c.conn_id)
logging.info(f"Listing Keys from {bucket}/{prefix}")
keys = hook.list_keys(bucket, prefix=prefix)
for key in keys:
logging.info(f"- s3://{bucket}/{key}")
Run Code Online (Sandbox Code Playgroud)
在这种情况下它工作正常。连接已顺利传递至 S3Hook。
然后我有第二个DAG:
redshift_connection = Connection(
conn_id='redshift',
conn_type='postgres',
login='duser',
password='xxxxxxxxxx',
host='xxxxxxxx.us-west-2.redshift.amazonaws.com',
port=5439,
schema='db'
)
aws_connection = Connection(
conn_id='aws_credentials',
conn_type='Amazon Web Services',
login='xxxxxxxxx',
password='xxxxxxxx'
)
def load_data_to_redshift(*args, **kwargs):
aws_hook = AwsHook(aws_connection.conn_id)
credentials = aws_hook.get_credentials()
redshift_hook = PostgresHook(redshift_connection.conn_id)
sql_stmnt = sql_statements.COPY_STATIONS_SQL.format(aws_connection.login, aws_connection.password)
redshift_hook.run(sql_stmnt)
dag = DAG(
's3_to_Redshift',
start_date=datetime.datetime.now()
)
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id=redshift_connection.conn_id,
sql=sql_statements.CREATE_STATIONS_TABLE_SQL,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
该 dag 返回以下错误:The conn_idredshiftisn't defined
这是为什么?我的第一条和第二条有什么区别?为什么连接似乎在第一个示例中有效,而在第二种情况下无效?
谢谢。
连接通常使用 UI 或 CLI 创建,如此处所述,并由 Airflow 存储在数据库后端。然后,操作符和相应的挂钩将连接 ID 作为参数,并使用它来检索这些连接的用户名、密码等。
aws_credentials就您而言,我怀疑您使用 UI 或 CLI创建了与 ID 的连接。因此,当您将其 ID 传递给S3Hook它时,它会成功检索凭据(从 Airflow 的数据库后端,而不是从Connection您创建的对象)。
但是,您没有创建与 ID 的连接redshift,因此,AwsHook抱怨它未定义。您必须首先按照文档中的描述创建连接。
注意:在DAG代码中不定义连接的原因是DAG代码通常存储在版本控制系统(例如Git)中。在那里存储凭证会存在安全风险。