这里我推的是XCOM的价值:
task_get_username_bash = BashOperator(
task_id='execute_bash',
bash_command='whoami',
xcom_push=True)
Run Code Online (Sandbox Code Playgroud)
所以在 XCOM 中,它存储为 {'return_value' : '$USER'} (在我的例子中 $USER = 'airflow')。
然后我想从 XCOM 中获取这个return_value:
task_insert_new_row = PostgresOperator(
task_id='insert_new_row',
trigger_rule=TriggerRule.ALL_DONE,
sql='''INSERT INTO table_name VALUES
(%s, %s, %s);''',
parameters=(uuid.uuid4().int % 123456789,
"{{ ti.xcom_pull(task_ids='execute_bash', key='return_value') }}",
datetime.now()))
Run Code Online (Sandbox Code Playgroud)
但 PostgresOperator 将宏引用解释为 str。如何在PostgresOperator中拉取XCOM?
问题解决了:
task_insert_new_row = PostgresOperator(
task_id='insert_new_row',
trigger_rule=TriggerRule.ALL_DONE,
sql='''INSERT INTO table_name VALUES
(%s, '{{ ti.xcom_pull(task_ids='execute_bash', key='return_value') }}', %s);''',
parameters=(uuid.uuid4().int % 123456789, datetime.now()))
Run Code Online (Sandbox Code Playgroud)