小编Shi*_*hiv的帖子

Airflow xcom_pull 不提供相同上游任务实例运行的数据,而是提供最新数据

我正在创建一个 Airflow @daily DAG,它有一个get_daily_dataBigQueryGetDataOperator的上游任务,它根据 execution_date 和下游依赖任务(PythonOperator)获取数据,通过 xcom_pull 使用以上基于日期的数据。当我运行气流回填命令时,process_data_from_bq我正在执行 xcom_pull的下游任务,它只获取最近的数据,而不是下游任务期望的同一执行日期的数据。Airfow 文档说如果我们传递如果 xcom_pull 为 task_ids 传递单个字符串,则返回来自该任务的最新 XCom 值 但是它没有说明如何获取 DAG 执行的同一实例的数据。

我经历了一个相同的问题如何在同一个 DAG 运行中从其他任务实例中提取 xcom 值(不是最近的一个)?然而,那里给出的一个解决方案是我已经在做的。但似乎它不是正确的答案。

DAG 定义:

dag = DAG(
    'daily_motor',
    default_args=default_args,
    schedule_interval='@daily'
)

#This task creates data in a BigQuery table based on execution date
extract_daily_data  = BigQueryOperator(
    task_id='daily_data_extract',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    sql=policy_by_transaction_date_sql('{{ ds }}'), 
    destination_dataset_table='Test.daily_data_tmp',
    dag=dag)


get_daily_data = BigQueryGetDataOperator(
    task_id='get_daily_data',
    dataset_id='Test',
    table_id='daily_data_tmp',
    max_results='10000',
    dag=dag

)


#This is where I need to pull …
Run Code Online (Sandbox Code Playgroud)

python airflow apache-airflow-xcom

5
推荐指数
1
解决办法
1940
查看次数

标签 统计

airflow ×1

apache-airflow-xcom ×1

python ×1