小编Jon*_*ira的帖子

如何从气流传感器中提取 xcom 值?

主要问题:我正在尝试创建 BigQuery 表(如果不存在)。

方法:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTableOperator 创建或不创建新表。

问题:我无法使用 xcom 获取 BigQueryTableSensor 传感器的返回值。众所周知,poke 方法需要返回一个布尔值。

这就是我创建任务的方式:

check_if_table_exists = BigQueryTableSensor(
        task_id='check_if_table_exists',
        project_id='my_project',
        dataset_id='my_dataset',
        table_id='my_table',
        bigquery_conn_id='bigquery_default',
        timeout=120,
        do_xcom_push=True,
    )

# Output: INFO - Success criteria met. Exiting.

get_results = BashOperator(
        task_id='get_results',
        bash_command="echo {{ ti.xcom_pull(task_ids='check_if_table_exists') }}"
    )

# Output: INFO - Running command: echo None
Run Code Online (Sandbox Code Playgroud)

查看 Airflow 界面,我检查了 BigQueryTableSensor 没有推送任何内容:(

在此处输入图片说明

题:

  • 有没有办法获得我的传感器的返回值?

  • 有没有更好的方法来解决我的主要问题?也许使用 BigQueryOperator 和像“CREATE TABLE IF NOT EXISTS”这样的 sql 查询。

python airflow apache-airflow-xcom

4
推荐指数
2
解决办法
427
查看次数

标签 统计

airflow ×1

apache-airflow-xcom ×1

python ×1