如何在 PostgresOperator 中提取 XCOM 值

Vla*_*lav 4 python airflow

这里我推的是XCOM的价值:

task_get_username_bash = BashOperator(
                task_id='execute_bash',
                bash_command='whoami',
                xcom_push=True)
Run Code Online (Sandbox Code Playgroud)

所以在 XCOM 中,它存储为 {'return_value' : '$USER'} (在我的例子中 $USER = 'airflow')。

然后我想从 XCOM 中获取这个return_value

task_insert_new_row = PostgresOperator(
                task_id='insert_new_row',
                trigger_rule=TriggerRule.ALL_DONE,
                sql='''INSERT INTO table_name VALUES
                (%s, %s, %s);''',
                parameters=(uuid.uuid4().int % 123456789,
                            "{{ ti.xcom_pull(task_ids='execute_bash', key='return_value') }}",
                            datetime.now()))
Run Code Online (Sandbox Code Playgroud)

但 PostgresOperator 将宏引用解释为 str。如何在PostgresOperator中拉取XCOM?

Vla*_*lav 5

问题解决了:

task_insert_new_row = PostgresOperator(
                task_id='insert_new_row',
                trigger_rule=TriggerRule.ALL_DONE,
                sql='''INSERT INTO table_name VALUES
                (%s, '{{ ti.xcom_pull(task_ids='execute_bash', key='return_value') }}', %s);''',
                parameters=(uuid.uuid4().int % 123456789, datetime.now()))
Run Code Online (Sandbox Code Playgroud)